diff --git a/.github/workflows/ci-test.sh b/.github/workflows/ci-test.sh new file mode 100755 index 00000000000..87020d6b2ab --- /dev/null +++ b/.github/workflows/ci-test.sh @@ -0,0 +1,74 @@ +if [ ${CLEAN:-0} -ne 0 ]; then + cargo clean +fi + +TEST_THREAD= + +if [ ${GENERATE_COV:-0} -ne 0 ]; then + export RUST_BACKTRACE=1 + export RUSTFLAGS="-Zinstrument-coverage" + export LLVM_PROFILE_FILE="tidb-engine-ext-%p-%m.profraw" + rustup component list | grep "llvm-tools-preview-x86_64-unknown-linux-gnu (installed)" + if [ $? -ne 0 ]; then + rustup component add llvm-tools-preview + fi + cargo install --list | grep grcov + if [ $? -ne 0 ]; then + cargo install grcov + fi + export TEST_THREAD="--test-threads 1" + find . -name "*.profraw" -type f -delete +fi + +cargo test --package tests --test failpoints -- cases::test_normal $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_bootstrap $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_compact_log $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_early_apply $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_encryption $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_pd_client $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_pending_peers $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_transaction $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_cmd_epoch_checker $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_disk_full $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_stale_peer $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_import_service $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_split_region --skip test_report_approximate_size_after_split_check $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_snap $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_merge $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_replica_read $TEST_THREAD && \ +# TiFlash do not support stale read currently +#cargo test --package tests --test failpoints -- cases::test_replica_stale_read $TEST_THREAD && \ +cargo test --package tests --test failpoints -- cases::test_server $TEST_THREAD + +cargo test --package tests --test integrations -- raftstore::test_bootstrap $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_clear_stale_data $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_compact_after_delete $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_compact_log $TEST_THREAD && \ +## Sometimes fails +#cargo test --package tests --test integrations -- raftstore::test_conf_change $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_early_apply $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_hibernate $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_joint_consensus $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_replica_read $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_snap $TEST_THREAD && \ +# Sometimes fails +#cargo test --package tests --test integrations -- raftstore::test_split_region $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_stale_peer $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_status_command $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_prevote $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_region_change_observer $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_region_heartbeat $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_region_info_accessor $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_transfer_leader $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_single $TEST_THREAD && \ +# Sometimes fails +cargo test --package tests --test integrations -- raftstore::test_merge $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_tombstone $TEST_THREAD && \ +cargo test --package tests --test integrations -- server::kv_service::test_read_index_check_memory_locks $TEST_THREAD && \ +cargo test --package tests --test integrations -- raftstore::test_batch_read_index $TEST_THREAD && \ +cargo test --package tests --test integrations -- import::test_sst_service::test_upload_sst $TEST_THREAD && \ + + +if [ ${GENERATE_COV:-0} -ne 0 ]; then + grcov . --binary-path target/debug/ . -t html --branch --ignore-not-existing -o ./coverage/ +fi \ No newline at end of file diff --git a/.github/workflows/pr-ci.yml b/.github/workflows/pr-ci.yml index f362cde3af6..d23dc2fc47d 100644 --- a/.github/workflows/pr-ci.yml +++ b/.github/workflows/pr-ci.yml @@ -55,18 +55,4 @@ jobs: # export RUSTC_WRAPPER=~/.cargo/bin/sccache # make test # make debug - cargo check - cargo test --package tests --test failpoints cases::test_normal - cargo test --package tests --test failpoints cases::test_bootstrap - cargo test --package tests --test failpoints cases::test_compact_log - cargo test --package tests --test failpoints cases::test_early_apply - cargo test --package tests --test failpoints cases::test_encryption - cargo test --package tests --test failpoints cases::test_pd_client - cargo test --package tests --test failpoints cases::test_pending_peers - cargo test --package tests --test failpoints cases::test_transaction - cargo test --package tests --test failpoints cases::test_cmd_epoch_checker - cargo test --package tests --test failpoints cases::test_disk_full - cargo test --package tests --test failpoints cases::test_snap - cargo test --package tests --test failpoints cases::test_merge - cargo test --package tests --test failpoints cases::test_stale_peer - cargo test --package tests --test failpoints cases::test_import_service + CLEAN=1 GENERATE_COV=0 sh .github/workflows/ci-test.sh diff --git a/.gitignore b/.gitignore index af89a9bef28..9a29ae138d1 100644 --- a/.gitignore +++ b/.gitignore @@ -39,4 +39,4 @@ fuzz-incremental/ /last_tikv.toml /raft/ core.* - +*.profraw diff --git a/components/raftstore/src/engine_store_ffi/mod.rs b/components/raftstore/src/engine_store_ffi/mod.rs index c692e236dc2..18661725b25 100644 --- a/components/raftstore/src/engine_store_ffi/mod.rs +++ b/components/raftstore/src/engine_store_ffi/mod.rs @@ -65,7 +65,7 @@ impl RaftStoreProxy { } impl RaftStoreProxyPtr { - unsafe fn as_ref(&self) -> &RaftStoreProxy { + pub unsafe fn as_ref(&self) -> &RaftStoreProxy { &*(self.inner as *const RaftStoreProxy) } pub fn is_null(&self) -> bool { @@ -544,6 +544,11 @@ fn get_engine_store_server_helper() -> &'static EngineStoreServerHelper { gen_engine_store_server_helper(unsafe { ENGINE_STORE_SERVER_HELPER_PTR }) } +#[cfg(feature = "test-raftstore-proxy")] +pub fn get_engine_store_server_helper_ptr() -> isize { + unsafe { ENGINE_STORE_SERVER_HELPER_PTR } +} + pub fn gen_engine_store_server_helper( engine_store_server_helper: isize, ) -> &'static EngineStoreServerHelper { diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 780ed7ce237..338455989d9 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -932,6 +932,14 @@ where break; } + if cfg!(feature = "test-raftstore-proxy") { + // Since `expect_index != entry.get_index()` may occasionally fail, add this log to gather log if it fails. + debug!( + "currently apply_state is {:?} entry index {}", + self.apply_state, + entry.get_index() + ); + } let expect_index = self.apply_state.get_applied_index() + 1; if expect_index != entry.get_index() { panic!( @@ -1494,10 +1502,35 @@ where ApplyResult, EngineStoreApplyRes, )> { + fail_point!( + "on_apply_write_cmd", + cfg!(release) || self.id() == 3, + |_| { + unimplemented!(); + } + ); const NONE_STR: &str = ""; let requests = req.get_requests(); let mut ssts = vec![]; let mut cmds = WriteCmds::with_capacity(requests.len()); + let resp = if cfg!(feature = "test-raftstore-proxy") { + let mut responses = Vec::with_capacity(requests.len()); + for req in requests { + let mut r = Response::default(); + r.set_cmd_type(req.get_cmd_type()); + responses.push(r); + } + + let mut resp = RaftCmdResponse::default(); + if !req.get_header().get_uuid().is_empty() { + let uuid = req.get_header().get_uuid().to_vec(); + resp.mut_header().set_uuid(uuid); + } + resp.set_responses(responses.into()); + resp + } else { + RaftCmdResponse::new() + }; for req in requests { let cmd_type = req.get_cmd_type(); match cmd_type { @@ -1509,6 +1542,9 @@ where self.metrics.size_diff_hint += key.len() as i64 + value.len() as i64; self.metrics.written_bytes += key.len() as u64 + value.len() as u64; self.metrics.written_keys += 1; + } else { + self.metrics.lock_cf_written_bytes += key.len() as u64; + self.metrics.lock_cf_written_bytes += value.len() as u64; } cmds.push(key, value, WriteCmdType::Put, cf); } @@ -1521,6 +1557,8 @@ where self.metrics.delete_keys_hint += 1; self.metrics.written_bytes += key.len() as u64; self.metrics.written_keys += 1; + } else { + self.metrics.lock_cf_written_bytes += key.len() as u64; } cmds.push(key, NONE_STR.as_ref(), WriteCmdType::Del, cf); } @@ -1564,11 +1602,7 @@ where "pending_ssts" => ?self.pending_clean_ssts ); - Ok(( - RaftCmdResponse::new(), - ApplyResult::None, - EngineStoreApplyRes::None, - )) + Ok((resp, ApplyResult::None, EngineStoreApplyRes::None)) } EngineStoreApplyRes::NotFound | EngineStoreApplyRes::Persist => { ssts.append(&mut self.pending_clean_ssts); @@ -1582,7 +1616,7 @@ where ); ctx.delete_ssts.append(&mut ssts.clone()); Ok(( - RaftCmdResponse::new(), + resp, ApplyResult::Res(ExecResult::IngestSst { ssts }), EngineStoreApplyRes::Persist, )) @@ -1599,7 +1633,7 @@ where ), ) }; - Ok((RaftCmdResponse::new(), ApplyResult::None, flash_res)) + Ok((resp, ApplyResult::None, flash_res)) }; } } @@ -1883,14 +1917,14 @@ where match change_type { ConfChangeType::AddNode => { - let add_ndoe_fp = || { + let add_node_fp = || { fail_point!( "apply_on_add_node_1_2", self.id == 2 && self.region_id() == 1, |_| {} ) }; - add_ndoe_fp(); + add_node_fp(); PEER_ADMIN_CMD_COUNTER_VEC .with_label_values(&["add_peer", "all"]) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 6d31033f738..9fbf5421860 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1408,7 +1408,14 @@ impl RaftBatchSystem { fail_point!("after_shutdown_apply"); self.system.shutdown(); if let Some(h) = handle { - h.join().unwrap(); + if cfg!(feature = "test-raftstore-proxy") { + let res = h.join(); + if res.is_err() { + debug!("thread shutdown with error {:?}", res.err()); + } + } else { + h.join().unwrap(); + } } workers.coprocessor_host.shutdown(); workers.cleanup_worker.stop(); diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index 9257b899ab3..3d827b8a8d4 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -38,8 +38,9 @@ use tikv_util::thread_group::GroupProperties; use tikv_util::HandyRwLock; use super::*; +use mock_engine_store::make_new_region; use mock_engine_store::EngineStoreServerWrap; -use std::sync::atomic::{AtomicBool, AtomicU8}; +use std::sync::atomic::AtomicU8; use tikv_util::sys::SysQuota; use tikv_util::time::ThreadReadId; @@ -160,7 +161,50 @@ pub struct Cluster { pub sim: Arc>, pub pd_client: Arc, pub ffi_helper_set: HashMap, - pub global_engine_helper_set: Option, +} + +static mut GLOBAL_ENGINE_HELPER_SET: Option = None; +static START: std::sync::Once = std::sync::Once::new(); + +pub unsafe fn get_global_engine_helper_set() -> &'static Option { + &GLOBAL_ENGINE_HELPER_SET +} + +fn make_global_ffi_helper_set_no_bind() -> (EngineHelperSet, *const u8) { + let mut engine_store_server = Box::new(mock_engine_store::EngineStoreServer::new(99999, None)); + let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new( + &mut *engine_store_server, + None, + 0, + )); + let engine_store_server_helper = Box::new(mock_engine_store::gen_engine_store_server_helper( + std::pin::Pin::new(&*engine_store_server_wrap), + )); + let ptr = &*engine_store_server_helper + as *const raftstore::engine_store_ffi::EngineStoreServerHelper as *mut u8; + // Will mutate ENGINE_STORE_SERVER_HELPER_PTR + ( + EngineHelperSet { + engine_store_server, + engine_store_server_wrap, + engine_store_server_helper, + }, + ptr, + ) +} + +pub fn init_global_ffi_helper_set() { + unsafe { + START.call_once(|| { + assert_eq!( + raftstore::engine_store_ffi::get_engine_store_server_helper_ptr(), + 0 + ); + let (set, ptr) = make_global_ffi_helper_set_no_bind(); + raftstore::engine_store_ffi::init_engine_store_server_helper(ptr); + GLOBAL_ENGINE_HELPER_SET = Some(set); + }); + } } impl Cluster { @@ -188,7 +232,6 @@ impl Cluster { sim, pd_client, ffi_helper_set: HashMap::default(), - global_engine_helper_set: None, } } @@ -242,40 +285,13 @@ impl Cluster { } } - pub fn make_global_ffi_helper_set(&mut self) { - let mut engine_store_server = - Box::new(mock_engine_store::EngineStoreServer::new(99999, None)); - let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new( - &mut *engine_store_server, - None, - self as *const Cluster as isize, - )); - let engine_store_server_helper = - Box::new(mock_engine_store::gen_engine_store_server_helper( - std::pin::Pin::new(&*engine_store_server_wrap), - )); - - unsafe { - raftstore::engine_store_ffi::init_engine_store_server_helper( - &*engine_store_server_helper - as *const raftstore::engine_store_ffi::EngineStoreServerHelper - as *mut u8, - ); - } - - self.global_engine_helper_set = Some(EngineHelperSet { - engine_store_server, - engine_store_server_wrap, - engine_store_server_helper, - }); - } - - pub fn make_ffi_helper_set( - &mut self, + pub fn make_ffi_helper_set_no_bind( id: u64, engines: Engines, key_mgr: &Option>, router: &RaftRouter, + mut node_cfg: TiKvConfig, + cluster_id: isize, ) -> (FFIHelperSet, TiKvConfig) { let proxy = Box::new(raftstore::engine_store_ffi::RaftStoreProxy { status: AtomicU8::new(raftstore::engine_store_ffi::RaftProxyStatus::Idle as u8), @@ -294,14 +310,13 @@ impl Cluster { let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new( &mut *engine_store_server, Some(&mut *proxy_helper), - self as *const Cluster as isize, + cluster_id, )); let engine_store_server_helper = Box::new(mock_engine_store::gen_engine_store_server_helper( std::pin::Pin::new(&*engine_store_server_wrap), )); - let mut node_cfg = self.cfg.clone(); let helper_sz = &*engine_store_server_helper as *const _ as isize; node_cfg.raft_store.engine_store_server_helper = helper_sz; let ffi_helper_set = FFIHelperSet { @@ -314,9 +329,25 @@ impl Cluster { (ffi_helper_set, node_cfg) } - pub fn start(&mut self) -> ServerResult<()> { - self.make_global_ffi_helper_set(); + pub fn make_ffi_helper_set( + &mut self, + id: u64, + engines: Engines, + key_mgr: &Option>, + router: &RaftRouter, + ) -> (FFIHelperSet, TiKvConfig) { + Cluster::::make_ffi_helper_set_no_bind( + id, + engines, + key_mgr, + router, + self.cfg.clone(), + self as *const Cluster as isize, + ) + } + pub fn start(&mut self) -> ServerResult<()> { + init_global_ffi_helper_set(); // Try recover from last shutdown. let node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); for node_id in node_ids { @@ -335,7 +366,7 @@ impl Cluster { let props = GroupProperties::default(); tikv_util::thread_group::set_properties(Some(props.clone())); - let (mut ffi_helper_set, mut node_cfg) = + let (mut ffi_helper_set, node_cfg) = self.make_ffi_helper_set(0, self.dbs.last().unwrap().clone(), &key_mgr, &router); let mut sim = self.sim.wl(); @@ -1069,10 +1100,8 @@ impl Cluster { pub fn must_put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) { match self.batch_put(key, vec![new_put_cf_cmd(cf, key, value)]) { Ok(resp) => { - if cfg!(feature = "test-raftstore-proxy") { - assert_eq!(resp.get_responses().len(), 1); - assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put); - } + assert_eq!(resp.get_responses().len(), 1); + assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put); } Err(e) => { panic!("has error: {:?}", e); @@ -1192,6 +1221,7 @@ impl Cluster { pub fn apply_state(&self, region_id: u64, store_id: u64) -> RaftApplyState { let key = keys::apply_state_key(region_id); + self.get_engine(store_id) .c() .get_msg_cf::(engine_traits::CF_RAFT, &key) @@ -1442,7 +1472,16 @@ impl Cluster { } pub fn wait_region_split(&mut self, region: &metapb::Region) { - self.wait_region_split_max_cnt(region, 20, 250, true); + self.wait_region_split_max_cnt( + region, + 20, + if cfg!(feature = "test-raftstore-proxy") { + 250 + } else { + 400 + }, + true, + ); } pub fn wait_region_split_max_cnt( @@ -1508,9 +1547,14 @@ impl Cluster { } pub fn try_merge(&mut self, source: u64, target: u64) -> RaftCmdResponse { + let duration = if cfg!(feature = "test-raftstore-proxy") { + 15 + } else { + 5 + }; self.call_command_on_leader( self.new_prepare_merge(source, target), - Duration::from_secs(5), + Duration::from_secs(duration), ) .unwrap() } diff --git a/components/test_raftstore/src/pd.rs b/components/test_raftstore/src/pd.rs index 5bbf248c3e7..960ec13bdcc 100644 --- a/components/test_raftstore/src/pd.rs +++ b/components/test_raftstore/src/pd.rs @@ -1070,7 +1070,14 @@ impl TestPdClient { pub fn must_merge(&self, from: u64, target: u64) { self.merge_region(from, target); - self.check_merged_timeout(from, Duration::from_secs(5)); + self.check_merged_timeout( + from, + Duration::from_secs(if cfg!(feature = "test-raftstore-proxy") { + 60 + } else { + 15 + }), + ); } pub fn check_merged(&self, from: u64) -> bool { @@ -1078,11 +1085,16 @@ impl TestPdClient { } pub fn check_merged_timeout(&self, from: u64, duration: Duration) { + let duration2 = if cfg!(feature = "test-raftstore-proxy") { + Duration::from_millis((duration.as_millis() as u64) * 5 as u64) + } else { + duration + }; let timer = Instant::now(); loop { let region = block_on(self.get_region_by_id(from)).unwrap(); if let Some(r) = region { - if timer.elapsed() > duration { + if timer.elapsed() > duration2 { panic!("region {:?} is still not merged.", r); } } else { @@ -1093,8 +1105,17 @@ impl TestPdClient { } pub fn region_leader_must_be(&self, region_id: u64, peer: metapb::Peer) { - for _ in 0..500 { - sleep_ms(10); + let num = if cfg!(feature = "test-raftstore-proxy") { + 3000 + } else { + 1000 + }; + for _ in 0..num { + if cfg!(feature = "test-raftstore-proxy") { + sleep_ms(30); + } else { + sleep_ms(10); + } if let Some(p) = self.cluster.rl().leaders.get(®ion_id) { if *p == peer { return; @@ -1472,7 +1493,7 @@ impl PdClient for TestPdClient { let mut id = pdpb::SplitId::default(); id.set_new_region_id(self.alloc_id().unwrap()); - for peer in region.get_peers() { + for _peer in region.get_peers() { let rid = self.alloc_id().unwrap(); id.mut_new_peer_ids().push(rid); } diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 94bc5b54307..76f6e1cdaa3 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -304,7 +304,7 @@ impl Simulator for ServerCluster { let check_leader_runner = CheckLeaderRunner::new(store_meta.clone()); let check_leader_scheduler = bg_worker.start("check-leader", check_leader_runner); - let mut lock_mgr = LockManager::new(); + let lock_mgr = LockManager::new(); let store = create_raft_storage( engine, &cfg.storage, @@ -429,7 +429,7 @@ impl Simulator for ServerCluster { let simulate_trans = SimulateTransport::new(trans); let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); - let pessimistic_txn_cfg = cfg.pessimistic_txn; + let _pessimistic_txn_cfg = cfg.pessimistic_txn; let split_check_runner = SplitCheckRunner::new(engines.kv.clone(), router.clone(), coprocessor_host.clone()); diff --git a/components/test_raftstore/src/util.rs b/components/test_raftstore/src/util.rs index 2d3eb347969..649b8b93b41 100644 --- a/components/test_raftstore/src/util.rs +++ b/components/test_raftstore/src/util.rs @@ -64,7 +64,13 @@ pub fn must_get(engine: &Arc, cf: &str, key: &[u8], value: Option<&[u8]>) { if value.is_none() && res.is_none() { return; } - thread::sleep(Duration::from_millis(20)); + thread::sleep(Duration::from_millis( + if cfg!(feature = "test-raftstore-proxy") { + 40 + } else { + 20 + }, + )); } debug!( "last try to get {} cf {}", @@ -601,7 +607,7 @@ pub fn must_error_read_on_peer( pub fn must_contains_error(resp: &RaftCmdResponse, msg: &str) { let header = resp.get_header(); - assert!(header.has_error()); + assert!(header.has_error(), "should have err {}", msg); let err_msg = header.get_error().get_message(); assert!(err_msg.contains(msg), "{:?}", resp); } diff --git a/components/tikv_kv/src/lib.rs b/components/tikv_kv/src/lib.rs index 78d4cf9e917..e73b37f6f63 100644 --- a/components/tikv_kv/src/lib.rs +++ b/components/tikv_kv/src/lib.rs @@ -54,7 +54,11 @@ use into_other::IntoOther; use tikv_util::time::ThreadReadId; pub const SEEK_BOUND: u64 = 8; -const DEFAULT_TIMEOUT_SECS: u64 = 5; +const DEFAULT_TIMEOUT_SECS: u64 = if cfg!(feature = "test-raftstore-proxy") { + 15 +} else { + 5 +}; pub type Callback = Box)) + Send>; pub type ExtCallback = Box; diff --git a/mock-engine-store/src/lib.rs b/mock-engine-store/src/lib.rs index 866bef5a7cb..a46d8707829 100644 --- a/mock-engine-store/src/lib.rs +++ b/mock-engine-store/src/lib.rs @@ -3,27 +3,55 @@ use engine_store_ffi::interfaces::root::DB as ffi_interfaces; use engine_store_ffi::EngineStoreServerHelper; use engine_store_ffi::RaftStoreProxyFFIHelper; use engine_store_ffi::UnwrapExternCFunc; +use engine_traits::Peekable; use engine_traits::{Engines, SyncMutable}; use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE}; +use kvproto::raft_serverpb::{ + MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState, +}; use protobuf::Message; use raftstore::engine_store_ffi; use std::collections::BTreeMap; use std::collections::HashMap; use std::pin::Pin; use tikv_util::{debug, error, info, warn}; -// use kvproto::raft_serverpb::{ -// MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState, -// }; type RegionId = u64; #[derive(Default, Clone)] pub struct Region { region: kvproto::metapb::Region, - peer: kvproto::metapb::Peer, + peer: kvproto::metapb::Peer, // What peer is me? data: [BTreeMap, Vec>; 3], apply_state: kvproto::raft_serverpb::RaftApplyState, } +pub fn make_new_region( + maybe_region: Option, + maybe_store_id: Option, +) -> Region { + let mut region = Region { + region: maybe_region.unwrap_or(Default::default()), + ..Default::default() + }; + if let Some(store_id) = maybe_store_id { + set_new_region_peer(&mut region, store_id); + } + region +} + +fn set_new_region_peer(new_region: &mut Region, store_id: u64) { + if let Some(peer) = new_region + .region + .get_peers() + .iter() + .find(|&peer| peer.get_store_id() == store_id) + { + new_region.peer = peer.clone(); + } else { + // This happens when region is not found. + } +} + pub struct EngineStoreServer { pub id: u64, pub engines: Option>, @@ -32,6 +60,7 @@ pub struct EngineStoreServer { impl EngineStoreServer { pub fn new(id: u64, engines: Option>) -> Self { + // The first region is added in cluster.rs EngineStoreServer { id, engines, @@ -47,6 +76,27 @@ pub struct EngineStoreServerWrap { pub cluster_ptr: isize, } +fn hacked_is_real_no_region(region_id: u64, engine_store_server: &mut EngineStoreServer) { + if region_id == 1 { + // In some tests, region 1 is not created on all nodes after store is started. + // We need to double check rocksdb before we are sure there are no region 1. + let kv = &mut engine_store_server.engines.as_mut().unwrap().kv; + let local_state: Option = kv + .get_msg_cf(engine_traits::CF_RAFT, &keys::region_state_key(1)) + .unwrap_or(None); + if local_state.is_none() { + panic!("Can find region 1 in storage"); + } + engine_store_server.kvstore.insert( + region_id, + Box::new(make_new_region( + Some(local_state.unwrap().get_region().clone()), + Some(engine_store_server.id), + )), + ); + } +} + impl EngineStoreServerWrap { pub fn new( engine_store_server: *mut EngineStoreServer, @@ -67,21 +117,204 @@ impl EngineStoreServerWrap { header: ffi_interfaces::RaftCmdHeader, ) -> ffi_interfaces::EngineStoreApplyRes { let region_id = header.region_id; + let node_id = (*self.engine_store_server).id; info!("handle admin raft cmd"; "request"=>?req, "response"=>?resp, "index"=>header.index, "region-id"=>header.region_id); - let do_handle_admin_raft_cmd = move |region: &mut Region| { - if region.apply_state.get_applied_index() >= header.index { - return ffi_interfaces::EngineStoreApplyRes::Persist; - } + let kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv; + let do_handle_admin_raft_cmd = + move |region: &mut Region, engine_store_server: &mut EngineStoreServer| { + if region.apply_state.get_applied_index() >= header.index { + return ffi_interfaces::EngineStoreApplyRes::Persist; + } + if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::BatchSplit { + let regions = resp.get_splits().regions.as_ref(); + + for i in 0..regions.len() { + let region_meta = regions.get(i).unwrap(); + if region_meta.id == region_id { + // This is the region to split from + assert!(engine_store_server.kvstore.contains_key(®ion_meta.id)); + engine_store_server + .kvstore + .get_mut(®ion_meta.id) + .unwrap() + .region = region_meta.clone(); + } else { + // Should split data into new region + let mut new_region = + make_new_region(Some(region_meta.clone()), Some(node_id)); + + debug!( + "new region {} generated by split at node {} with meta {:?}", + region_meta.id, node_id, region_meta + ); + new_region + .apply_state + .mut_truncated_state() + .set_index(raftstore::store::RAFT_INIT_LOG_INDEX); + new_region + .apply_state + .mut_truncated_state() + .set_term(raftstore::store::RAFT_INIT_LOG_TERM); + new_region + .apply_state + .set_applied_index(raftstore::store::RAFT_INIT_LOG_INDEX); + + // No need to split data because all KV are stored in the same RocksDB + + // We can't assert `region_meta.id` is brand new here + engine_store_server + .kvstore + .insert(region_meta.id, Box::new(new_region)); + } + } + } else if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::PrepareMerge { + let tikv_region = resp.get_split().get_left(); + + let target = req.prepare_merge.as_ref().unwrap().target.as_ref(); + let region_meta = &mut (engine_store_server + .kvstore + .get_mut(®ion_id) + .unwrap() + .region); + let region_epoch = region_meta.region_epoch.as_mut().unwrap(); + + let new_version = region_epoch.version + 1; + region_epoch.set_version(new_version); + assert_eq!(tikv_region.get_region_epoch().get_version(), new_version); + + let conf_version = region_epoch.conf_ver + 1; + region_epoch.set_conf_ver(conf_version); + assert_eq!(tikv_region.get_region_epoch().get_conf_ver(), conf_version); + + { + let region = engine_store_server.kvstore.get_mut(®ion_id).unwrap(); + region.apply_state.set_applied_index(header.index); + } + // We don't handle MergeState and PeerState here + } else if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::CommitMerge { + { + let tikv_region_meta = resp.get_split().get_left(); + + let target_region = + &mut (engine_store_server.kvstore.get_mut(®ion_id).unwrap()); + let target_region_meta = &mut target_region.region; + let target_version = target_region_meta.get_region_epoch().get_version(); + let source_region = req.get_commit_merge().get_source(); + let source_version = source_region.get_region_epoch().get_version(); + + let new_version = std::cmp::max(source_version, target_version) + 1; + target_region_meta + .mut_region_epoch() + .set_version(new_version); + assert_eq!( + target_region_meta.get_region_epoch().get_version(), + new_version + ); - ffi_interfaces::EngineStoreApplyRes::Persist - }; + // No need to merge data + let source_at_left = if source_region.get_start_key().is_empty() { + true + } else if target_region_meta.get_start_key().is_empty() { + false + } else { + source_region + .get_end_key() + .cmp(target_region_meta.get_start_key()) + == std::cmp::Ordering::Equal + }; + + if source_at_left { + target_region_meta + .set_start_key(source_region.get_start_key().to_vec()); + assert_eq!( + tikv_region_meta.get_start_key(), + target_region_meta.get_start_key() + ); + } else { + target_region_meta.set_end_key(source_region.get_end_key().to_vec()); + assert_eq!( + tikv_region_meta.get_end_key(), + target_region_meta.get_end_key() + ); + } + + { + target_region.apply_state.set_applied_index(header.index); + } + } + { + engine_store_server + .kvstore + .remove(&req.get_commit_merge().get_source().get_id()); + } + } else if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::RollbackMerge { + let region = (engine_store_server.kvstore.get_mut(®ion_id).unwrap()); + let region_meta = &mut region.region; + let new_version = region_meta.get_region_epoch().get_version() + 1; + + region.apply_state.set_applied_index(header.index); + } else if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::ChangePeer + || req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::ChangePeerV2 + { + let new_region_meta = resp.get_change_peer().get_region(); + + let old_peer_id = { + let old_region = engine_store_server.kvstore.get_mut(®ion_id).unwrap(); + old_region.region = new_region_meta.clone(); + old_region.apply_state.set_applied_index(header.index); + old_region.peer.get_id() + }; + + let mut do_remove = true; + for peer in new_region_meta.get_peers() { + if peer.get_id() == old_peer_id { + // Should not remove region + do_remove = false; + } + } + if do_remove { + let removed = engine_store_server.kvstore.remove(®ion_id); + // We need to also remove apply state, thus we need to know peer_id + debug!( + "Remove region {:?} peer_id {} at node {}", + removed.unwrap().region, + old_peer_id, + node_id + ); + } + } else if [ + kvproto::raft_cmdpb::AdminCmdType::CompactLog, + kvproto::raft_cmdpb::AdminCmdType::ComputeHash, + kvproto::raft_cmdpb::AdminCmdType::VerifyHash, + ] + .iter() + .cloned() + .collect::>() + .contains(&req.cmd_type) + { + let region = engine_store_server.kvstore.get_mut(®ion_id).unwrap(); + region.apply_state.set_applied_index(header.index); + } + ffi_interfaces::EngineStoreApplyRes::Persist + }; + if !(*self.engine_store_server).kvstore.contains_key(®ion_id) { + hacked_is_real_no_region(region_id, &mut *self.engine_store_server); + } match (*self.engine_store_server).kvstore.entry(region_id) { std::collections::hash_map::Entry::Occupied(mut o) => { - do_handle_admin_raft_cmd(o.get_mut()) + do_handle_admin_raft_cmd(o.get_mut(), &mut (*self.engine_store_server)) } std::collections::hash_map::Entry::Vacant(v) => { - warn!("region {} not found", region_id); - do_handle_admin_raft_cmd(v.insert(Default::default())) + warn!( + "handle_admin_raft_cmd region {} not found at node {}", + region_id, node_id + ); + + // do_handle_admin_raft_cmd( + // v.insert(Box::new(make_new_region(None, Some(node_id)))), + // &mut (*self.engine_store_server), + // ) + ffi_interfaces::EngineStoreApplyRes::NotFound } } } @@ -92,22 +325,25 @@ impl EngineStoreServerWrap { header: ffi_interfaces::RaftCmdHeader, ) -> ffi_interfaces::EngineStoreApplyRes { let region_id = header.region_id; + let node_id = (*self.engine_store_server).id; let server = &mut (*self.engine_store_server); let kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv; - - let do_handle_write_raft_cmd = move |region: &mut Region| { + let mut do_handle_write_raft_cmd = move |region: &mut Region| { if region.apply_state.get_applied_index() >= header.index { + debug!("handle_write_raft_cmd meet old index"); return ffi_interfaces::EngineStoreApplyRes::None; } + debug!( + "handle_write_raft_cmd region {} node id {}", + region_id, server.id, + ); for i in 0..cmds.len { let key = &*cmds.keys.add(i as _); let val = &*cmds.vals.add(i as _); debug!( - "handle_write_raft_cmd add K {:?} V {:?} to region {} node id {}", + "handle_write_raft_cmd add K {:?} V {:?}", key.to_slice(), val.to_slice(), - region_id, - server.id ); let tp = &*cmds.cmd_types.add(i as _); let cf = &*cmds.cmd_cf.add(i as _); @@ -120,7 +356,8 @@ impl EngineStoreServerWrap { cf_to_name(cf.to_owned().into()), &tikv_key, &val.to_slice().to_vec(), - ); + ) + .map_err(std::convert::identity); } engine_store_ffi::WriteCmdType::Del => { let tikv_key = keys::data_key(key.to_slice()); @@ -128,17 +365,34 @@ impl EngineStoreServerWrap { } } } + region.apply_state.set_applied_index(header.index); + persist_apply_state( + region, + kv, + region_id, + true, + false, + header.index, + header.term, + ); // Do not advance apply index ffi_interfaces::EngineStoreApplyRes::None }; + if !(*self.engine_store_server).kvstore.contains_key(®ion_id) { + hacked_is_real_no_region(region_id, &mut *self.engine_store_server); + } match (*self.engine_store_server).kvstore.entry(region_id) { std::collections::hash_map::Entry::Occupied(mut o) => { do_handle_write_raft_cmd(o.get_mut()) } std::collections::hash_map::Entry::Vacant(v) => { - warn!("region {} not found", region_id); - do_handle_write_raft_cmd(v.insert(Default::default())) + warn!( + "handle_write_raft_cmd region {} not found at node {}", + region_id, node_id + ); + // do_handle_write_raft_cmd(v.insert(Box::new(make_new_region(None, Some(node_id))))) + ffi_interfaces::EngineStoreApplyRes::NotFound } } } @@ -347,41 +601,42 @@ unsafe extern "C" fn ffi_pre_handle_snapshot( term: u64, ) -> ffi_interfaces::RawCppPtr { let store = into_engine_store_server_wrap(arg1); + let node_id = (*store.engine_store_server).id; let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap()); let kvstore = &mut (*store.engine_store_server).kvstore; - let mut req = kvproto::metapb::Region::default(); + let mut region_meta = kvproto::metapb::Region::default(); assert_ne!(region_buff.data, std::ptr::null()); assert_ne!(region_buff.len, 0); - req.merge_from_bytes(region_buff.to_slice()).unwrap(); - - let req_id = req.id; + region_meta + .merge_from_bytes(region_buff.to_slice()) + .unwrap(); - let mut region = Region { - region: req, - peer: Default::default(), - data: Default::default(), - apply_state: Default::default(), - }; + let mut region = make_new_region(Some(region_meta), Some(node_id)); - debug!("apply snaps with len {}", snaps.len); + debug!( + "prehandle snapshot with len {} node_id {} peer_id {}", + snaps.len, node_id, peer_id + ); for i in 0..snaps.len { let mut snapshot = snaps.views.add(i as usize); let mut sst_reader = SSTReader::new(proxy_helper, &*(snapshot as *mut ffi_interfaces::SSTView)); { - region.apply_state.set_applied_index(index); region.apply_state.mut_truncated_state().set_index(index); region.apply_state.mut_truncated_state().set_term(term); + { + region.apply_state.set_applied_index(index); + } } while sst_reader.remained() { let key = sst_reader.key(); let value = sst_reader.value(); - let cf_index = (*snapshot).type_ as u8; - let data = &mut region.data[cf_index as usize]; + let cf_index = (*snapshot).type_ as usize; + let data = &mut region.data[cf_index]; let _ = data.insert(key.to_slice().to_vec(), value.to_slice().to_vec()); sst_reader.next(); @@ -401,7 +656,6 @@ pub fn cf_to_name(cf: ffi_interfaces::ColumnFamilyType) -> &'static str { ffi_interfaces::ColumnFamilyType::Lock => CF_LOCK, ffi_interfaces::ColumnFamilyType::Write => CF_WRITE, ffi_interfaces::ColumnFamilyType::Default => CF_DEFAULT, - _ => unreachable!(), } } @@ -416,6 +670,7 @@ unsafe extern "C" fn ffi_apply_pre_handled_snapshot( let req_id = req.region.as_ref().unwrap().region.id; + // Though we do not write to kvstore in memory now, we still need to maintain regions. &(*store.engine_store_server) .kvstore .insert(req_id, Box::new(req.region.take().unwrap())); @@ -425,12 +680,18 @@ unsafe extern "C" fn ffi_apply_pre_handled_snapshot( .get_mut(&req_id) .unwrap(); + debug!( + "apply pre-handled snapshot on new_region {} at store {}", + req_id, node_id + ); + let kv = &mut (*store.engine_store_server).engines.as_mut().unwrap().kv; for cf in 0..3 { for (k, v) in std::mem::take(region.data.as_mut().get_mut(cf).unwrap()).into_iter() { let tikv_key = keys::data_key(k.as_slice()); let cf_name = cf_to_name(cf.into()); - kv.put_cf(cf_name, &tikv_key, &v); + kv.put_cf(cf_name, &tikv_key, &v) + .map_err(std::convert::identity); } } } @@ -447,38 +708,81 @@ unsafe extern "C" fn ffi_handle_ingest_sst( let region_id = header.region_id; let kvstore = &mut (*store.engine_store_server).kvstore; let kv = &mut (*store.engine_store_server).engines.as_mut().unwrap().kv; - let region = kvstore.get_mut(®ion_id).unwrap().as_mut(); - - let index = header.index; - let term = header.term; + let region = kvstore.get_mut(®ion_id).unwrap(); for i in 0..snaps.len { - let mut snapshot = snaps.views.add(i as usize); + let snapshot = snaps.views.add(i as usize); let mut sst_reader = SSTReader::new(proxy_helper, &*(snapshot as *mut ffi_interfaces::SSTView)); while sst_reader.remained() { let key = sst_reader.key(); let value = sst_reader.value(); - - let cf_index = (*snapshot).type_ as u8; - let tikv_key = keys::data_key(key.to_slice()); let cf_name = cf_to_name((*snapshot).type_); - kv.put_cf(cf_name, &tikv_key, &value.to_slice()); + kv.put_cf(cf_name, &tikv_key, &value.to_slice()) + .map_err(std::convert::identity); sst_reader.next(); } } - { - region.apply_state.set_applied_index(index); - region.apply_state.mut_truncated_state().set_index(index); - region.apply_state.mut_truncated_state().set_term(term); - } - + // Since tics#1811, Br/Lightning will always ingest both WRITE and DEFAULT, so we can always persist, rather than wait. ffi_interfaces::EngineStoreApplyRes::Persist } +fn persist_apply_state( + region: &mut Region, + kv: &mut RocksEngine, + region_id: u64, + persist_apply_index: bool, + persist_truncated_state: bool, + potential_index: u64, + potential_term: u64, +) { + let apply_key = keys::apply_state_key(region_id); + let mut old_apply_state = kv + .get_msg_cf::(engine_traits::CF_RAFT, &apply_key) + .unwrap_or(None); + if old_apply_state.is_none() { + // Have not set apply_state, use ours + kv.put_cf( + engine_traits::CF_RAFT, + &apply_key, + ®ion.apply_state.write_to_bytes().unwrap(), + ) + .map_err(std::convert::identity); + } else { + let old_apply_state = old_apply_state.as_mut().unwrap(); + if persist_apply_index { + old_apply_state.set_applied_index(region.apply_state.get_applied_index()); + if potential_index > old_apply_state.get_commit_index() + || potential_term > old_apply_state.get_commit_term() + { + old_apply_state.set_commit_index(potential_index); + old_apply_state.set_commit_term(potential_term); + region.apply_state.set_commit_index(potential_index); + region.apply_state.set_commit_term(potential_term); + } + } + if persist_truncated_state { + old_apply_state + .mut_truncated_state() + .set_index(region.apply_state.get_truncated_state().get_index()); + old_apply_state + .mut_truncated_state() + .set_term(region.apply_state.get_truncated_state().get_term()); + } + if persist_apply_index || persist_truncated_state { + kv.put_cf( + engine_traits::CF_RAFT, + &apply_key, + &old_apply_state.write_to_bytes().unwrap(), + ) + .map_err(std::convert::identity); + } + } +} + unsafe extern "C" fn ffi_handle_compute_store_stats( arg1: *mut ffi_interfaces::EngineStoreServerWrap, ) -> ffi_interfaces::StoreStats { @@ -495,3 +799,6 @@ unsafe extern "C" fn ffi_handle_compute_store_stats( engine_keys_read: 0, } } + +unsafe impl Sync for EngineStoreServer {} +unsafe impl Sync for EngineStoreServerWrap {} diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs index f5e979c2c83..9253363a16e 100644 --- a/tests/failpoints/cases/mod.rs +++ b/tests/failpoints/cases/mod.rs @@ -17,6 +17,7 @@ mod test_pending_peers; mod test_replica_read; mod test_replica_stale_read; mod test_server; +mod test_snap; mod test_split_region; mod test_stale_peer; mod test_stale_read; diff --git a/tests/failpoints/cases/test_bootstrap.rs b/tests/failpoints/cases/test_bootstrap.rs index 6cd9a48eaa4..f047a6cdc0c 100644 --- a/tests/failpoints/cases/test_bootstrap.rs +++ b/tests/failpoints/cases/test_bootstrap.rs @@ -11,9 +11,6 @@ fn test_bootstrap_half_way_failure(fp: &str) { let pd_client = Arc::new(TestPdClient::new(0, false)); let sim = Arc::new(RwLock::new(NodeCluster::new(pd_client.clone()))); let mut cluster = Cluster::new(0, 5, sim, pd_client); - unsafe { - test_raftstore::init_cluster_ptr(&cluster); - } // Try to start this node, return after persisted some keys. fail::cfg(fp, "return").unwrap(); diff --git a/tests/failpoints/cases/test_compact_log.rs b/tests/failpoints/cases/test_compact_log.rs index 78cae076dcf..2be572931ce 100644 --- a/tests/failpoints/cases/test_compact_log.rs +++ b/tests/failpoints/cases/test_compact_log.rs @@ -58,7 +58,11 @@ fn test_evict_entry_cache() { fail::cfg("needs_evict_entry_cache", "return").unwrap(); fail::cfg("on_raft_gc_log_tick_1", "off").unwrap(); - sleep_ms(500); // Wait to trigger a raft log compaction. + sleep_ms(if cfg!(feature = "test-raftstore-proxy") { + 700 + } else { + 500 + }); // Wait to trigger a raft log compaction. let entry_cache_size = MEMTRACE_ENTRY_CACHE.sum(); // Entries on store 1 will be evict even if they are still in life time. assert!(entry_cache_size < 50 * 1024); diff --git a/tests/failpoints/cases/test_normal.rs b/tests/failpoints/cases/test_normal.rs index 381166c5a23..4de32155c68 100644 --- a/tests/failpoints/cases/test_normal.rs +++ b/tests/failpoints/cases/test_normal.rs @@ -3,20 +3,16 @@ use std::sync::{Arc, RwLock}; use engine_traits::{IterOptions, Iterable, Iterator, Peekable}; -use kvproto::{metapb, raft_serverpb}; use mock_engine_store; use test_raftstore::*; + #[test] fn test_normal() { let pd_client = Arc::new(TestPdClient::new(0, false)); let sim = Arc::new(RwLock::new(NodeCluster::new(pd_client.clone()))); let mut cluster = Cluster::new(0, 3, sim, pd_client); - unsafe { - test_raftstore::init_cluster_ptr(&cluster); - } - // Try to start this node, return after persisted some keys. - let result = cluster.start(); + cluster.run(); let k = b"k1"; let v = b"v1"; diff --git a/tests/failpoints/cases/test_server.rs b/tests/failpoints/cases/test_server.rs index ea8ae8b8eaa..93f5d8f3cc5 100644 --- a/tests/failpoints/cases/test_server.rs +++ b/tests/failpoints/cases/test_server.rs @@ -4,6 +4,22 @@ use pd_client::PdClient; use raft::eraftpb::MessageType; use test_raftstore::*; +fn get_addr(pd_client: &std::sync::Arc, node_id: u64) -> String { + if cfg!(feature = "test-raftstore-proxy") { + pd_client + .get_store(node_id) + .unwrap() + .get_peer_address() + .to_string() + } else { + pd_client + .get_store(node_id) + .unwrap() + .get_address() + .to_string() + } +} + /// When encountering raft/batch_raft mismatch store id error, the service is expected /// to drop connections in order to let raft_client re-resolve store address from PD /// This will make the mismatch error be automatically corrected. @@ -23,22 +39,9 @@ fn test_mismatch_store_node() { must_get_equal(&cluster.get_engine(node1_id), b"k1", b"v1"); must_get_equal(&cluster.get_engine(node2_id), b"k1", b"v1"); must_get_equal(&cluster.get_engine(node3_id), b"k1", b"v1"); - let node1_addr = pd_client - .get_store(node1_id) - .unwrap() - .get_address() - .to_string(); - let node2_addr = pd_client - .get_store(node2_id) - .unwrap() - .get_address() - .to_string(); - let node3_addr = cluster - .pd_client - .get_store(node3_id) - .unwrap() - .get_address() - .to_string(); + let node1_addr = get_addr(&pd_client, node1_id); + let node2_addr = get_addr(&pd_client, node2_id); + let node3_addr = get_addr(&pd_client, node3_id); cluster.stop_node(node2_id); cluster.stop_node(node3_id); // run node2 @@ -58,18 +61,9 @@ fn test_mismatch_store_node() { sleep_ms(600); fail::cfg("mock_store_refresh_interval_secs", "return(0)").unwrap(); cluster.must_put(b"k2", b"v2"); - assert_eq!( - node1_addr, - pd_client.get_store(node1_id).unwrap().get_address() - ); - assert_eq!( - node3_addr, - pd_client.get_store(node2_id).unwrap().get_address() - ); - assert_eq!( - node2_addr, - cluster.pd_client.get_store(node3_id).unwrap().get_address() - ); + assert_eq!(node1_addr, get_addr(&pd_client, node1_id)); + assert_eq!(node3_addr, get_addr(&pd_client, node2_id)); + assert_eq!(node2_addr, get_addr(&pd_client, node3_id)); must_get_equal(&cluster.get_engine(node3_id), b"k2", b"v2"); must_get_equal(&cluster.get_engine(node2_id), b"k2", b"v2"); fail::remove("mock_store_refresh_interval_secs"); diff --git a/tests/failpoints/cases/test_snap.rs b/tests/failpoints/cases/test_snap.rs index f1d978f0fdf..01d032c2349 100644 --- a/tests/failpoints/cases/test_snap.rs +++ b/tests/failpoints/cases/test_snap.rs @@ -90,7 +90,14 @@ fn test_server_snapshot_on_resolve_failure() { must_get_none(&engine2, b"k1"); // If snapshot status is reported correctly, sending snapshot should be retried. - notify_rx.recv_timeout(Duration::from_secs(3)).unwrap(); + #[cfg(feature = "test-raftstore-proxy")] + { + notify_rx.recv_timeout(Duration::from_secs(5)).unwrap(); + } + #[cfg(not(feature = "test-raftstore-proxy"))] + { + notify_rx.recv_timeout(Duration::from_secs(3)).unwrap(); + } } #[test] diff --git a/tests/failpoints/cases/test_split_region.rs b/tests/failpoints/cases/test_split_region.rs index b33e644df75..513e0fea00a 100644 --- a/tests/failpoints/cases/test_split_region.rs +++ b/tests/failpoints/cases/test_split_region.rs @@ -15,6 +15,7 @@ use raftstore::Result; use tikv_util::HandyRwLock; use collections::HashMap; +use engine_traits::Peekable; use test_raftstore::*; use tikv_util::config::{ReadableDuration, ReadableSize}; @@ -363,7 +364,7 @@ fn test_split_not_to_split_existing_tombstone_region() { fail::remove(before_check_snapshot_1_2_fp); // Wait for the logs - sleep_ms(100); + sleep_ms(3000); // If left_peer_2 can be created, dropping all msg to make it exist. cluster.add_send_filter(IsolationFilterFactory::new(2)); diff --git a/tests/integrations/raftstore/mod.rs b/tests/integrations/raftstore/mod.rs index c5c129361f7..8c9682fd124 100644 --- a/tests/integrations/raftstore/mod.rs +++ b/tests/integrations/raftstore/mod.rs @@ -1,5 +1,6 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. +mod test_batch_read_index; mod test_bootstrap; mod test_clear_stale_data; mod test_compact_after_delete; diff --git a/tests/integrations/raftstore/test_batch_read_index.rs b/tests/integrations/raftstore/test_batch_read_index.rs new file mode 100644 index 00000000000..cba8f7ec270 --- /dev/null +++ b/tests/integrations/raftstore/test_batch_read_index.rs @@ -0,0 +1,70 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::{Arc, RwLock}; + +use engine_rocks::Compat; +use engine_traits::{IterOptions, Iterable, Iterator, Peekable}; +use kvproto::{kvrpcpb, metapb, raft_serverpb}; +use mock_engine_store; +use raftstore::engine_store_ffi::*; +use std::time::Duration; +use test_raftstore::*; + +#[test] +fn test_batch_read_index() { + let pd_client = Arc::new(TestPdClient::new(0, false)); + let sim = Arc::new(RwLock::new(NodeCluster::new(pd_client.clone()))); + let mut cluster = Cluster::new(0, 3, sim, pd_client); + + cluster.run(); + + let k = b"k1"; + let v = b"v1"; + cluster.must_put(k, v); + + let key = cluster.ffi_helper_set.keys().next().unwrap(); + let proxy_helper = cluster + .ffi_helper_set + .get(&key) + .unwrap() + .proxy_helper + .as_ref(); + + let mut req = kvrpcpb::ReadIndexRequest::default(); + + let region = cluster.get_region(b"k1"); + + let mut key_range = kvrpcpb::KeyRange::default(); + key_range.set_start_key(region.get_start_key().to_vec()); + key_range.set_end_key(region.get_end_key().to_vec()); + req.mut_ranges().push(key_range); + + let context = req.mut_context(); + + context.set_region_id(region.get_id()); + context.set_peer(region.get_peers().first().unwrap().clone()); + context + .mut_region_epoch() + .set_version(region.get_region_epoch().get_version()); + context + .mut_region_epoch() + .set_conf_ver(region.get_region_epoch().get_conf_ver()); + + sleep_ms(100); + let req_vec = vec![req]; + let res = unsafe { + proxy_helper + .proxy_ptr + .as_ref() + .read_index_client + .batch_read_index(req_vec, Duration::from_millis(100)) + }; + + assert_eq!(res.len(), 1); + let res = &res[0]; + // Put (k1,v1) has index 7 + assert_eq!(res.0.get_read_index(), 7); + assert_eq!(res.1, region.get_id()); + + cluster.shutdown(); +} diff --git a/tests/integrations/raftstore/test_bootstrap.rs b/tests/integrations/raftstore/test_bootstrap.rs index 1259b4f221c..d79acd443e0 100644 --- a/tests/integrations/raftstore/test_bootstrap.rs +++ b/tests/integrations/raftstore/test_bootstrap.rs @@ -36,11 +36,12 @@ fn test_bootstrap_idempotent(cluster: &mut Cluster) { #[test] fn test_node_bootstrap_with_prepared_data() { + test_raftstore::init_global_ffi_helper_set(); // create a node let pd_client = Arc::new(TestPdClient::new(0, false)); let cfg = new_tikv_config(0); - let (_, system) = fsm::create_raft_batch_system(&cfg.raft_store); + let (router, system) = fsm::create_raft_batch_system(&cfg.raft_store); let simulate_trans = SimulateTransport::new(ChannelTransport::new()); let tmp_path = Builder::new().prefix("test_cluster").tempdir().unwrap(); let engine = Arc::new( @@ -56,6 +57,14 @@ fn test_node_bootstrap_with_prepared_data() { RocksEngine::from_db(Arc::clone(&engine)), RocksEngine::from_db(Arc::clone(&raft_engine)), ); + let (ffi_helper_set, cfg) = Cluster::::make_ffi_helper_set_no_bind( + 0, + engines.clone(), + &None, + &router, + cfg, + 0, + ); let tmp_mgr = Builder::new().prefix("test_cluster").tempdir().unwrap(); let bg_worker = WorkerBuilder::new("background").thread_count(2).create(); let mut node = Node::new( diff --git a/tests/integrations/raftstore/test_compact_lock_cf.rs b/tests/integrations/raftstore/test_compact_lock_cf.rs index 703e49169ef..195bc2d62a4 100644 --- a/tests/integrations/raftstore/test_compact_lock_cf.rs +++ b/tests/integrations/raftstore/test_compact_lock_cf.rs @@ -5,6 +5,8 @@ use engine_traits::{MiscExt, CF_LOCK}; use test_raftstore::*; use tikv_util::config::*; +use engine_traits::KvEngine; + fn flush(cluster: &mut Cluster) { for engines in cluster.engines.values() { engines.kv.flush_cf(CF_LOCK, true).unwrap(); @@ -13,8 +15,16 @@ fn flush(cluster: &mut Cluster) { fn flush_then_check(cluster: &mut Cluster, interval: u64, written: bool) { flush(cluster); + // Wait for compaction. - sleep_ms(interval * 2); + sleep_ms( + interval + * if cfg!(feature = "test-raftstore-proxy") { + 3 + } else { + 2 + }, + ); for engines in cluster.engines.values() { let compact_write_bytes = engines .kv diff --git a/tests/integrations/raftstore/test_early_apply.rs b/tests/integrations/raftstore/test_early_apply.rs index cb58bb9b1dc..412e6a7d2c4 100644 --- a/tests/integrations/raftstore/test_early_apply.rs +++ b/tests/integrations/raftstore/test_early_apply.rs @@ -1,6 +1,7 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use engine_rocks::RocksSnapshot; +use protobuf::Message; use raft::eraftpb::MessageType; use raftstore::store::*; use std::time::*; @@ -22,6 +23,12 @@ enum DataLost { AllLost, } +static DURATION: u64 = if cfg!(feature = "test-raftstore-proxy") { + 8 +} else { + 3 +}; + fn test(cluster: &mut Cluster, action: A, check: C, mode: DataLost) where A: FnOnce(&mut Cluster), @@ -38,13 +45,14 @@ where cluster.add_send_filter(CloneFilterFactory(filter)); let last_index = cluster.raft_local_state(1, 1).get_last_index(); action(cluster); - cluster.wait_last_index(1, 1, last_index + 1, Duration::from_secs(3)); + cluster.wait_last_index(1, 1, last_index + 1, Duration::from_secs(DURATION)); let mut snaps = vec![]; snaps.push((1, RocksSnapshot::new(cluster.get_raft_engine(1)))); + if mode == DataLost::AllLost { - cluster.wait_last_index(1, 2, last_index + 1, Duration::from_secs(3)); + cluster.wait_last_index(1, 2, last_index + 1, Duration::from_secs(DURATION)); snaps.push((2, RocksSnapshot::new(cluster.get_raft_engine(2)))); - cluster.wait_last_index(1, 3, last_index + 1, Duration::from_secs(3)); + cluster.wait_last_index(1, 3, last_index + 1, Duration::from_secs(DURATION)); snaps.push((3, RocksSnapshot::new(cluster.get_raft_engine(3)))); } cluster.clear_send_filters(); @@ -153,7 +161,7 @@ fn test_update_internal_apply_index() { cluster.async_put(b"k2", b"v2").unwrap(); let mut snaps = vec![]; for i in 1..3 { - cluster.wait_last_index(1, i, last_index + 2, Duration::from_secs(3)); + cluster.wait_last_index(1, i, last_index + 2, Duration::from_secs(DURATION)); snaps.push((i, RocksSnapshot::new(cluster.get_raft_engine(1)))); } cluster.clear_send_filters(); diff --git a/tests/integrations/raftstore/test_hibernate.rs b/tests/integrations/raftstore/test_hibernate.rs index daa40d4bcaa..ea38ffc20d6 100644 --- a/tests/integrations/raftstore/test_hibernate.rs +++ b/tests/integrations/raftstore/test_hibernate.rs @@ -11,6 +11,12 @@ use raft::eraftpb::{ConfChangeType, MessageType}; use test_raftstore::*; use tikv_util::HandyRwLock; +const INTERVAL_TIMES: u32 = if cfg!(feature = "test-raftstore-proxy") { + 5 +} else { + 2 +}; + #[test] fn test_proposal_prevent_sleep() { let mut cluster = new_node_cluster(0, 3); @@ -299,7 +305,7 @@ fn test_inconsistent_configuration() { })) .when(filter.clone()), )); - thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * 2); + thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * INTERVAL_TIMES); assert!(!awakened.load(Ordering::SeqCst)); // Simulate rolling disable hibernate region in followers @@ -317,7 +323,7 @@ fn test_inconsistent_configuration() { ); awakened.store(false, Ordering::SeqCst); filter.store(true, Ordering::SeqCst); - thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * 2); + thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * INTERVAL_TIMES); // Leader should keep awake as peer 3 won't agree to sleep. assert!(awakened.load(Ordering::SeqCst)); cluster.reset_leader_of_region(1); @@ -396,7 +402,7 @@ fn test_hibernate_feature_gate() { ); awakened.store(false, Ordering::SeqCst); filter.store(true, Ordering::SeqCst); - thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * 2); + thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * INTERVAL_TIMES); // Leader can go to sleep as version requirement is met. assert!(!awakened.load(Ordering::SeqCst)); } diff --git a/tests/integrations/raftstore/test_merge.rs b/tests/integrations/raftstore/test_merge.rs index d9205674cc3..f86410a8534 100644 --- a/tests/integrations/raftstore/test_merge.rs +++ b/tests/integrations/raftstore/test_merge.rs @@ -190,6 +190,14 @@ fn test_node_merge_with_slow_learner() { #[cfg(feature = "protobuf-codec")] #[test] fn test_node_merge_prerequisites_check() { + let get_global = if cfg!(feature = "test-raftstore-proxy") { + // This test can print too much log, so disable log here + let get_global = ::slog_global::get_global(); + ::slog_global::clear_global(); + Some(get_global) + } else { + None + }; let mut cluster = new_node_cluster(0, 3); configure_for_merge(&mut cluster); let pd_client = Arc::clone(&cluster.pd_client); @@ -265,6 +273,10 @@ fn test_node_merge_prerequisites_check() { cluster.clear_send_filters(); cluster.must_put(b"k24", b"v24"); must_get_equal(&cluster.get_engine(3), b"k24", b"v24"); + + if cfg!(feature = "test-raftstore-proxy") { + ::slog_global::set_global((*(get_global.unwrap())).clone()); + } } /// Test if stale peer will be handled properly after merge. @@ -584,6 +596,7 @@ fn test_node_merge_brain_split() { /// Test whether approximate size and keys are updated after merge #[test] +#[cfg(not(feature = "test-raftstore-proxy"))] fn test_merge_approximate_size_and_keys() { let mut cluster = new_node_cluster(0, 3); cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(20); diff --git a/tests/integrations/raftstore/test_single.rs b/tests/integrations/raftstore/test_single.rs index 41285f734ed..3e9a1e277c9 100644 --- a/tests/integrations/raftstore/test_single.rs +++ b/tests/integrations/raftstore/test_single.rs @@ -127,6 +127,7 @@ fn test_node_delete() { test_delete(&mut cluster); } +#[cfg(not(feature = "test-raftstore-proxy"))] #[test] fn test_node_use_delete_range() { let mut cluster = new_node_cluster(0, 1); @@ -137,6 +138,7 @@ fn test_node_use_delete_range() { test_delete_range(&mut cluster, CF_WRITE); } +#[cfg(not(feature = "test-raftstore-proxy"))] #[test] fn test_node_not_use_delete_range() { let mut cluster = new_node_cluster(0, 1); @@ -192,11 +194,15 @@ fn test_node_apply_no_op() { let timer = Instant::now(); loop { let state = cluster.apply_state(1, 1); + // When new leader is elected, should apply one no-op entry if state.get_applied_index() > RAFT_INIT_LOG_INDEX { break; } if timer.elapsed() > Duration::from_secs(3) { - panic!("apply no-op log not finish after 3 seconds"); + panic!( + "apply no-op log not finish after 3 seconds, now {}", + state.get_applied_index() + ); } sleep_ms(10); } diff --git a/tests/integrations/raftstore/test_snap.rs b/tests/integrations/raftstore/test_snap.rs index 53f82eb3399..cf7c1b783c0 100644 --- a/tests/integrations/raftstore/test_snap.rs +++ b/tests/integrations/raftstore/test_snap.rs @@ -444,6 +444,7 @@ fn test_node_snapshot_with_append() { } #[test] +// #[cfg(not(feature = "test-raftstore-proxy"))] fn test_server_snapshot_with_append() { let mut cluster = new_server_cluster(0, 4); test_snapshot_with_append(&mut cluster); diff --git a/tests/integrations/raftstore/test_split_region.rs b/tests/integrations/raftstore/test_split_region.rs index d648d32ac5d..c0b531d0cf9 100644 --- a/tests/integrations/raftstore/test_split_region.rs +++ b/tests/integrations/raftstore/test_split_region.rs @@ -213,9 +213,15 @@ fn test_auto_split_region(cluster: &mut Cluster) { let epoch = left.get_region_epoch().clone(); let get = new_request(left.get_id(), epoch, vec![new_get_cmd(&max_key)], false); - let resp = cluster - .call_command_on_leader(get, Duration::from_secs(5)) - .unwrap(); + let resp = if cfg!(feature = "test-raftstore-proxy") { + cluster + .call_command_on_leader(get, Duration::from_secs(10)) + .unwrap() + } else { + cluster + .call_command_on_leader(get, Duration::from_secs(5)) + .unwrap() + }; assert!(resp.get_header().has_error()); assert!(resp.get_header().get_error().has_key_not_in_region()); } diff --git a/tests/integrations/server/status_server.rs b/tests/integrations/server/status_server.rs index f34cf44c9b9..770718876de 100644 --- a/tests/integrations/server/status_server.rs +++ b/tests/integrations/server/status_server.rs @@ -45,14 +45,12 @@ fn test_region_meta_endpoint() { assert!(router.is_some()); let mut status_server = unsafe { - let helperset = &*cluster - .global_engine_helper_set + let helperset = &test_raftstore::get_global_engine_helper_set() .as_ref() .unwrap() .engine_store_server_helper; - let helperptr = helperset as *const EngineStoreServerHelper; StatusServer::new( - &*helperptr, + &*helperset, 1, None, ConfigController::default(),