From 79da4a9419b34e39a382ef627793ee4c3fe23f49 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 12 Apr 2024 18:30:46 +0800 Subject: [PATCH] Add some tests for replica read (#369) --- proxy_tests/proxy/shared/replica_read.rs | 122 ++++++++++++++++++++++- 1 file changed, 120 insertions(+), 2 deletions(-) diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index 4af131c767d..1f991a6cf82 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -7,7 +7,7 @@ use engine_store_ffi::ffi::{ ProtoMsgBaseBuff, }; -use crate::utils::v1::*; +use crate::{shared::ffi, utils::v1::*}; #[derive(Default)] struct GcMonitor { @@ -145,7 +145,7 @@ extern "C" fn ffi_wake(data: RawVoidPtr) { } #[test] -fn test_read_index() { +fn test_read_index_normal() { // Initialize cluster let (mut cluster, pd_client) = new_mock_cluster(0, 3); configure_for_lease_read(&mut cluster, Some(50), Some(10_000)); @@ -208,6 +208,124 @@ fn test_read_index() { cluster.shutdown(); } +/// If a read index request is received while region state is Applying, +/// it could be handled correctly. +#[test] +fn test_read_index_applying() { + // Initialize cluster + let (mut cluster, pd_client) = new_mock_cluster(0, 2); + configure_for_lease_read(&mut cluster, Some(50), Some(10_000)); + cluster.cfg.raft_store.raft_heartbeat_ticks = 1; + cluster.cfg.raft_store.raft_log_compact_sync_interval = ReadableDuration::millis(500); + pd_client.disable_default_operator(); + disable_auto_gen_compact_log(&mut cluster); + // Otherwise will panic with `assert_eq!(apply_state, last_applied_state)`. + fail::cfg("on_pre_write_apply_state", "return(true)").unwrap(); + // Set region and peers + let r1 = cluster.run_conf_change(); + let p1 = new_peer(1, 1); + let p2 = new_learner_peer(2, 2); + + cluster.pd_client.must_add_peer(r1, p2.clone()); + cluster.must_put(b"k0", b"v"); + { + let prev_state = maybe_collect_states(&cluster.cluster_ext, r1, Some(vec![1])); + let (compact_index, compact_term) = get_valid_compact_index_by(&prev_state, Some(vec![1])); + } + cluster.pd_client.must_none_pending_peer(p2.clone()); + // assert_eq!(cluster.pd_client.get_pending_peers().len(), 0); + let region = cluster.get_region(b"k0"); + assert_eq!(cluster.leader_of_region(region.get_id()).unwrap(), p1); + + check_key(&cluster, b"k0", b"v", Some(true), None, Some(vec![1])); + + fail::cfg("region_apply_snap", "return").unwrap(); + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(r1, 2) + .msg_type(MessageType::MsgAppend) + .direction(Direction::Both), + )); + + for i in 1..5 { + cluster.must_put(format!("k{}", i).as_bytes(), b"v"); + } + + check_key(&cluster, b"k4", b"v", Some(true), None, Some(vec![1])); + + { + let prev_state = collect_all_states(&cluster.cluster_ext, r1); + let (compact_index, compact_term) = get_valid_compact_index_by(&prev_state, Some(vec![1])); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = test_raftstore::new_admin_request(r1, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + assert!(!res.get_header().has_error(), "{:?}", res); + } + + cluster.must_put(b"kz", b"v"); + check_key(&cluster, b"kz", b"v", Some(true), None, Some(vec![1])); + check_key(&cluster, b"k1", b"v", Some(false), None, Some(vec![2])); + + // Wait until gc. + std::thread::sleep(std::time::Duration::from_millis(1500)); + + cluster.clear_send_filters(); + + let waker = Waker::new(); + + for (id, peer, f) in &[(2, p2, true)] { + iter_ffi_helpers( + &cluster, + Some(vec![*id]), + &mut |_, ffi_helper: &mut FFIHelperSet| { + assert_eq!( + general_get_region_local_state( + &ffi_helper.engine_store_server.engines.as_ref().unwrap().kv, + r1 + ) + .unwrap() + .get_state(), + PeerState::Applying + ); + let mut request = kvproto::kvrpcpb::ReadIndexRequest::default(); + + { + let context = request.mut_context(); + context.set_region_id(region.get_id()); + context.set_peer(peer.clone()); + context.set_region_epoch(region.get_region_epoch().clone()); + request.set_start_ts(666); + + let mut range = kvproto::kvrpcpb::KeyRange::default(); + range.set_start_key(region.get_start_key().to_vec()); + range.set_end_key(region.get_end_key().to_vec()); + request.mut_ranges().push(range); + + debug!("make read index request {:?}", &request); + } + let w = if *f { Some(&waker) } else { None }; + let resp = blocked_read_index(&request, &*ffi_helper.proxy_helper, w).unwrap(); + assert_ne!(resp.get_read_index(), 0); + debug!("resp detail {:?}", resp); + assert!(!resp.has_region_error()); + assert!(!resp.has_locked()); + }, + ); + } + + drop(waker); + + { + assert!(!GC_MONITOR.is_empty()); + assert!(GC_MONITOR.valid_clean()); + } + + cluster.shutdown(); + fail::remove("on_pre_write_apply_state"); + fail::remove("region_apply_snap"); +} + #[test] fn test_util() { // test timer