diff --git a/Cargo.toml b/Cargo.toml index 887344667af..bf7d1c97b03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -454,7 +454,7 @@ codegen-units = 4 [profile.test] opt-level = 0 -debug = true +debug = 0 codegen-units = 16 lto = false incremental = true diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 904d35fec2f..c0277cbc5a6 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1704,6 +1704,7 @@ where "msg_size" => msg.get_message().compute_size(), "to" => to_peer_id, "disk_usage" => ?msg.get_disk_usage(), + "!!!!msg" => ?msg ); for (term, index) in msg diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs index e2d62d4a9c6..62e03331a82 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs @@ -447,6 +447,10 @@ impl> Cluster { } } + pub fn get_router(&self, node_id: u64) -> Option> { + self.sim.rl().get_router(node_id) + } + fn valid_leader_id(&self, region_id: u64, leader_id: u64) -> bool { let store_ids = match self.voter_store_ids_of_region(region_id) { None => return false, diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index 969c28af033..1f769af8d17 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -359,6 +359,7 @@ unsafe extern "C" fn ffi_release_pre_handled_snapshot( pub fn gen_engine_store_server_helper( wrap: Pin<&EngineStoreServerWrap>, ) -> EngineStoreServerHelper { + info!("mock gen_engine_store_server_helper"); EngineStoreServerHelper { magic_number: interfaces_ffi::RAFT_STORE_PROXY_MAGIC_NUMBER, version: interfaces_ffi::RAFT_STORE_PROXY_VERSION, diff --git a/proxy_components/proxy_ffi/src/read_index_helper.rs b/proxy_components/proxy_ffi/src/read_index_helper.rs index 604345c9bd4..02b2666c67d 100644 --- a/proxy_components/proxy_ffi/src/read_index_helper.rs +++ b/proxy_components/proxy_ffi/src/read_index_helper.rs @@ -82,7 +82,7 @@ fn into_read_index_response( resp } -fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest { +pub fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest { let region_id = req.get_context().get_region_id(); let mut cmd = RaftCmdRequest::default(); { diff --git a/proxy_tests/proxy/shared/parse_raft_message.rs b/proxy_tests/proxy/shared/parse_raft_message.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index 1f991a6cf82..5dfb2e1b58c 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -356,3 +356,473 @@ fn test_util() { } assert!(GC_MONITOR.valid_clean()); } + +use kvproto::{ + kvrpcpb::{Context, DiskFullOpt, KeyRange}, + raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest}, + raft_serverpb::RaftMessage, +}; +use raftstore::{ + router::RaftStoreRouter, + store::{msg::Callback, RaftCmdExtraOpts, ReadIndexContext}, +}; +use tokio::sync::oneshot; +use txn_types::{Key, Lock, LockType, TimeStamp}; +use uuid::Uuid; + +use crate::utils::v1_server::{new_server_cluster, ChannelBuilder, Environment, TikvClient}; + +// https://github.com/tikv/tikv/issues/16823 +#[test] +fn test_raft_cmd_request_cant_advanve_max_ts() { + use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; + + let mut cluster = new_server_cluster(0, 1); + cluster.run(); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + + let region = cluster.get_region(b""); + let leader = region.get_peers()[0].clone(); + let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + let mut ctx = Context::default(); + let region_id = leader.get_id(); + ctx.set_region_id(leader.get_id()); + ctx.set_region_epoch(region.get_region_epoch().clone()); + ctx.set_peer(leader); + + let read_index = |ranges: &[(&[u8], &[u8])]| { + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + + let mut cmd = RaftCmdRequest::default(); + { + let mut header = RaftRequestHeader::default(); + let mut inner_req = RaftRequest::default(); + inner_req.set_cmd_type(CmdType::ReadIndex); + inner_req + .mut_read_index() + .set_start_ts(start_ts.into_inner()); + + let mut req = ReadIndexRequest::default(); + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + req.set_context(ctx.clone()); + req.set_start_ts(start_ts.into_inner()); + for &(start_key, end_key) in ranges { + let mut range = KeyRange::default(); + range.set_start_key(start_key.to_vec()); + range.set_end_key(end_key.to_vec()); + req.mut_ranges().push(range); + } + + header.set_region_id(region_id); + header.set_peer(req.get_context().get_peer().clone()); + header.set_region_epoch(req.get_context().get_region_epoch().clone()); + cmd.set_header(header); + cmd.set_requests(vec![inner_req].into()); + } + + let (result_tx, result_rx) = oneshot::channel(); + let router = cluster.get_router(1).unwrap(); + if let Err(e) = router.send_command( + cmd, + Callback::read(Box::new(move |resp| { + result_tx.send(resp.response).unwrap(); + })), + RaftCmdExtraOpts { + deadline: None, + disk_full_opt: DiskFullOpt::AllowedOnAlmostFull, + }, + ) { + panic!("router send msg failed, error: {}", e); + } + + let resp = block_on(result_rx).unwrap(); + (resp.get_responses()[0].get_read_index().clone(), start_ts) + }; + + // wait a while until the node updates its own max ts + std::thread::sleep(Duration::from_millis(300)); + + let prev_cm_max_ts = cm.max_ts(); + let (resp, start_ts) = read_index(&[(b"l", b"yz")]); + assert!(!resp.has_locked()); + // Actually not changed + assert_eq!(cm.max_ts(), prev_cm_max_ts); + assert_ne!(cm.max_ts(), start_ts); + cluster.shutdown(); + fail::remove("on_pre_write_apply_state") +} + +// https://github.com/tikv/tikv/pull/8669/files +#[test] +fn test_raft_cmd_request_learner_advanve_max_ts() { + use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; + + let mut cluster = new_server_cluster(0, 2); + cluster.pd_client.disable_default_operator(); + let region_id = cluster.run_conf_change(); + let region = cluster.get_region(b""); + assert_eq!(region_id, 1); + assert_eq!(region.get_id(), 1); + let leader = region.get_peers()[0].clone(); + + fail::cfg("on_pre_write_apply_state", "return(true)").unwrap(); + let learner = new_learner_peer(2, 2); + cluster.pd_client.must_add_peer(1, learner.clone()); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + let keys: Vec<_> = vec![b"k", b"l"] + .into_iter() + .map(|k| Key::from_raw(k)) + .collect(); + let guards = block_on(cm.lock_keys(keys.iter())); + let lock = Lock::new( + LockType::Put, + b"k".to_vec(), + 1.into(), + 20000, + None, + 1.into(), + 1, + 2.into(), + false, + ); + guards[0].with_lock(|l| *l = Some(lock.clone())); + + let addr = cluster.sim.rl().get_addr(learner.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + // cluster.must_put(b"k", b"v"); + + let read_index = |ranges: &[(&[u8], &[u8])]| { + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + + // https://github.com/pingcap/tiflash/blob/14a127820d0530e496af624bb5b69acd48caf747/dbms/src/Storages/KVStore/Read/ReadIndex.cpp#L39 + let mut ctx = Context::default(); + let learner = learner.clone(); + ctx.set_region_id(region_id); + ctx.set_region_epoch(region.get_region_epoch().clone()); + ctx.set_peer(learner); + let mut read_index_request = ReadIndexRequest::default(); + read_index_request.set_context(ctx); + read_index_request.set_start_ts(start_ts.into_inner()); + for (s, e) in ranges { + let mut r = KeyRange::new(); + r.set_start_key(s.to_vec()); + r.set_end_key(e.to_vec()); + read_index_request.mut_ranges().push(r); + } + let mut cmd = + proxy_ffi::read_index_helper::gen_read_index_raft_cmd_req(&mut read_index_request); + + let (result_tx, result_rx) = oneshot::channel(); + let router = cluster.get_router(2).unwrap(); + if let Err(e) = router.send_command( + cmd, + Callback::read(Box::new(move |resp| { + result_tx.send(resp.response).unwrap(); + })), + RaftCmdExtraOpts { + deadline: None, + disk_full_opt: DiskFullOpt::AllowedOnAlmostFull, + }, + ) { + panic!("router send msg failed, error: {}", e); + } + + let resp = block_on(result_rx).unwrap(); + (resp.get_responses()[0].get_read_index().clone(), start_ts) + }; + + // wait a while until the node updates its own max ts + std::thread::sleep(Duration::from_millis(3000)); + + must_wait_until_cond_node( + &cluster.cluster_ext, + region_id, + None, + &|states: &States| -> bool { + states.in_disk_region_state.get_region().get_peers().len() == 2 + }, + ); + + let prev_cm_max_ts = cm.max_ts(); + let (resp, start_ts) = read_index(&[(b"l", b"yz")]); + assert!(!resp.has_locked()); + // Actually not changed + assert_ne!(cm.max_ts(), prev_cm_max_ts); + assert_eq!(cm.max_ts(), start_ts); + + // `gen_read_index_raft_cmd_req` can only handle one key-range + let (resp, start_ts) = read_index(&[(b"j", b"k0")]); + assert_eq!(resp.get_locked(), &lock.into_lock_info(b"k".to_vec())); + assert_eq!(cm.max_ts(), start_ts); + + drop(guards); + + let (resp, start_ts) = read_index(&[(b"a", b"z")]); + assert!(!resp.has_locked()); + assert_eq!(cm.max_ts(), start_ts); + cluster.shutdown(); + fail::remove("on_pre_write_apply_state") +} + +#[test] +fn test_raft_message_can_advanve_max_ts() { + use kvproto::raft_cmdpb::{ReadIndexRequest, ReadIndexResponse}; + let mut cluster = new_server_cluster(0, 1); + cluster.run(); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + + let region = cluster.get_region(b""); + let leader = region.get_peers()[0].clone(); + let follower = new_learner_peer(2, 2); + let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + let region_id = leader.get_id(); + + let read_index = |ranges: &[(&[u8], &[u8])]| { + let mut m = raft::eraftpb::Message::default(); + m.set_msg_type(MessageType::MsgReadIndex); + let mut read_index_req = ReadIndexRequest::default(); + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + read_index_req.set_start_ts(start_ts.into_inner()); + for &(start_key, end_key) in ranges { + let mut range = KeyRange::default(); + range.set_start_key(start_key.to_vec()); + range.set_end_key(end_key.to_vec()); + read_index_req.mut_key_ranges().push(range); + } + + let rctx = ReadIndexContext { + id: Uuid::new_v4(), + request: Some(read_index_req), + locked: None, + }; + let mut e = raft::eraftpb::Entry::default(); + e.set_data(rctx.to_bytes().into()); + m.mut_entries().push(e); + m.set_from(2); + + let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default(); + raft_msg.set_region_id(region.get_id()); + raft_msg.set_from_peer(follower); + raft_msg.set_to_peer(leader); + raft_msg.set_region_epoch(region.get_region_epoch().clone()); + raft_msg.set_message(m); + cluster.send_raft_msg(raft_msg).unwrap(); + + (ReadIndexResponse::default(), start_ts) + }; + + // wait a while until the node updates its own max ts + let prev_cm_max_ts = cm.max_ts(); + let (resp, start_ts) = read_index(&[(b"l", b"yz")]); + cluster.must_put(b"a", b"b"); + std::thread::sleep(Duration::from_millis(2000)); + // assert!(!resp.has_locked()); + // Actually not changed + assert_ne!(cm.max_ts(), prev_cm_max_ts); + assert_eq!(cm.max_ts(), start_ts); + cluster.shutdown(); + fail::remove("on_pre_write_apply_state") +} + +fn concurrent_update_maxts_and_commit(read_index_in_middle: bool) { + use kvproto::{ + kvrpcpb::{Mutation, Op}, + kvrpcpb::{ReadIndexRequest, ReadIndexResponse}, + }; + use test_raftstore::{ + must_kv_commit, must_kv_prewrite, must_kv_prewrite_with, must_kv_read_equal, new_mutation, + }; + let mut cluster = new_server_cluster(0, 2); + cluster.pd_client.disable_default_operator(); + let region_id = cluster.run_conf_change(); + let region = cluster.get_region(b""); + assert_eq!(region_id, 1); + assert_eq!(region.get_id(), 1); + let leader = region.get_peers()[0].clone(); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + fail::cfg("on_pre_write_apply_state", "return(true)").unwrap(); + let learner = new_learner_peer(2, 2); + cluster.pd_client.must_add_peer(1, learner.clone()); + + let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + let region_id = leader.get_id(); + + let mut ctx = Context::default(); + ctx.set_region_id(region_id); + ctx.set_peer(leader.clone()); + ctx.set_region_epoch(region.get_region_epoch().clone()); + + let read_index = |ranges: &[(&[u8], &[u8])], start_ts: u64| { + // https://github.com/pingcap/tiflash/blob/14a127820d0530e496af624bb5b69acd48caf747/dbms/src/Storages/KVStore/Read/ReadIndex.cpp#L39 + let mut ctx = Context::default(); + let learner = learner.clone(); + ctx.set_region_id(region_id); + ctx.set_region_epoch(region.get_region_epoch().clone()); + ctx.set_peer(learner); + let mut read_index_request = ReadIndexRequest::default(); + read_index_request.set_context(ctx); + read_index_request.set_start_ts(start_ts); + for (s, e) in ranges { + let mut r = KeyRange::new(); + r.set_start_key(s.to_vec()); + r.set_end_key(e.to_vec()); + read_index_request.mut_ranges().push(r); + } + let mut cmd = + proxy_ffi::read_index_helper::gen_read_index_raft_cmd_req(&mut read_index_request); + + let (result_tx, result_rx) = oneshot::channel(); + let router = cluster.get_router(2).unwrap(); + if let Err(e) = router.send_command( + cmd, + Callback::read(Box::new(move |resp| { + result_tx.send(resp.response).unwrap(); + })), + RaftCmdExtraOpts { + deadline: None, + disk_full_opt: DiskFullOpt::AllowedOnAlmostFull, + }, + ) { + panic!("router send msg failed, error: {}", e); + } + + let resp = block_on(result_rx).unwrap(); + (resp.get_responses()[0].get_read_index().clone(), start_ts) + }; + + let cli = { + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + TikvClient::new(channel) + }; + + if !read_index_in_middle { + // read index -> calculate_min_commit_ts -> write lock + fail::cfg("before_calculate_min_commit_ts", "pause").unwrap(); + } + if read_index_in_middle { + // calculate_min_commit_ts -> read index -> write lock + fail::cfg("after_calculate_min_commit_ts", "pause").unwrap(); + } + let mut prewrite_resp = { + use kvproto::kvrpcpb::PrewriteRequest; + let mut prewrite_req = PrewriteRequest::default(); + prewrite_req.set_context(ctx.clone()); + prewrite_req.set_mutations( + vec![new_mutation(Op::Put, &b"key2"[..], &b"value1"[..])] + .into_iter() + .collect(), + ); + prewrite_req.primary_lock = b"key2".to_vec(); + prewrite_req.start_version = 10; + prewrite_req.lock_ttl = 3000; + prewrite_req.for_update_ts = 0; + prewrite_req.min_commit_ts = prewrite_req.start_version + 1; + prewrite_req.use_async_commit = true; + prewrite_req.try_one_pc = false; + let prewrite_resp = client.kv_prewrite_async(&prewrite_req).unwrap(); + prewrite_resp + }; + // Wait prewrite finish + std::thread::sleep(Duration::from_millis(2000)); + let prev_cm_max_ts = cm.max_ts(); + let (resp, start_ts) = read_index(&[(b"", b"")], 1112); + // Wait read index finish + std::thread::sleep(Duration::from_millis(2000)); + assert_ne!(cm.max_ts(), prev_cm_max_ts); + assert_eq!(cm.max_ts().into_inner(), start_ts); + fail::remove("before_calculate_min_commit_ts"); + fail::remove("after_calculate_min_commit_ts"); + + let pre_resp = prewrite_resp.receive_sync(); + info!("pre_resp is {:?}", pre_resp); + + { + let mut leader_id = 0; + let peers = region.get_peers(); + for p in peers { + if p.get_id() == leader.get_id() { + leader_id = p.get_id(); + break; + } + } + info!("leader_id is {}", leader_id); + + let mut c = Context::default(); + c.set_region_id(region.get_id()); + c.set_region_epoch(region.get_region_epoch().clone()); + c.set_peer(leader.clone()); + c.set_replica_read(true); + let mut range = KeyRange::default(); + let raw_key = b"key2"; + let encoded_key = Key::from_raw(raw_key); + range.set_start_key(encoded_key.as_encoded().to_vec()); + let snap_c = SnapContext { + pb_ctx: &c, + // start_ts: Some(1112.into()), + start_ts: None, + key_ranges: vec![range], + ..Default::default() + }; + + use engine_traits::KvEngine; + use tikv::storage::{kv::SnapContext, mvcc::MvccReader, Engine}; + let mut engine = cluster.sim.rl().storages[&leader_id].clone(); + let snapshot = engine.snapshot(snap_c).unwrap(); + let mut reader = MvccReader::new(snapshot, None, true); + let lock = reader + .load_lock(&Key::from_raw(&b"key2"[..])) + .unwrap() + .unwrap(); + info!("!!!!! ddddd {:?}", lock); + // assert_eq!(lock.ts, start_ts.into()); + assert!(!lock.is_pessimistic_lock()); + if read_index_in_middle { + assert_eq!(resp.has_locked(), true); + assert_eq!(lock.min_commit_ts.into_inner(), 11); + } else { + assert_eq!(lock.min_commit_ts.into_inner(), 1113); + } + } +} + +#[test] +fn test_concurrent_update_maxts_and_commit_middle() { + concurrent_update_maxts_and_commit(true); +} + +#[test] +fn test_concurrent_update_maxts_and_commit_before() { + concurrent_update_maxts_and_commit(false); +} + + +#[test] +fn test_parse_raft_read_index_message() { + let s = "1660283F2EAC474F841FDFF38775DC57723D08C28090A7CEC4FB9D0612310A1B7480000000000000FF9A5F728000000000FFD4C1F90000000000FA12127480000000000000FF9B00000000000000F8"; + let x = raftstore::store::ReadIndexContext::parse(&hex::decode(s).unwrap()).unwrap(); + info!("{:?}", x.request); +} \ No newline at end of file diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index 9f42925b6d4..4406d0ed94a 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -801,11 +801,13 @@ impl ReadIndexObserver for ReplicaReadLockChecker { return; } assert_eq!(msg.get_entries().len(), 1); + info!("!!!!!! ZZZZ update max_ts"); let mut rctx = ReadIndexContext::parse(msg.get_entries()[0].get_data()).unwrap(); if let Some(mut request) = rctx.request.take() { let begin_instant = Instant::now(); let start_ts = request.get_start_ts().into(); + info!("!!!!!! ZZZZ update max_ts to {}", start_ts); self.concurrency_manager.update_max_ts(start_ts); for range in request.mut_key_ranges().iter_mut() { let key_bound = |key: Vec| { @@ -834,6 +836,7 @@ impl ReadIndexObserver for ReplicaReadLockChecker { }, ); if let Err(txn_types::Error(box txn_types::ErrorInner::KeyIsLocked(lock))) = res { + info!("!!!!!! ZZZZ has lock {:?}", lock); rctx.locked = Some(lock); REPLICA_READ_LOCK_CHECK_HISTOGRAM_VEC_STATIC .locked diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 13d868849f4..b1d3910f49e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1543,6 +1543,7 @@ impl Storage { match &cmd { Command::Prewrite(Prewrite { mutations, .. }) => { + info!("!!!!!! prewrite {:?}", mutations); let keys = mutations.iter().map(|m| m.key().as_encoded()); Self::check_api_version( self.api_version, diff --git a/src/storage/txn/actions/prewrite.rs b/src/storage/txn/actions/prewrite.rs index 64e22a13585..ba6f39501f4 100644 --- a/src/storage/txn/actions/prewrite.rs +++ b/src/storage/txn/actions/prewrite.rs @@ -551,6 +551,7 @@ impl<'a> PrewriteMutation<'a> { lock.secondaries = secondary_keys.to_owned(); } + fail_point!("before_calculate_min_commit_ts"); let final_min_commit_ts = if lock.use_async_commit || try_one_pc { let res = async_commit_timestamps( &self.key, @@ -566,6 +567,7 @@ impl<'a> PrewriteMutation<'a> { lock.use_async_commit = false; lock.secondaries = Vec::new(); } + info!("!!!!! final_min_commit_ts {:?}", res); res } else { Ok(TimeStamp::zero())