Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log for current ReadIndex mechanism #370

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
}
}

pub fn get_router(&self, node_id: u64) -> Option<RaftRouter<TiFlashEngine, ProxyRaftEngine>> {
self.sim.rl().get_router(node_id)
}

fn valid_leader_id(&self, region_id: u64, leader_id: u64) -> bool {
let store_ids = match self.voter_store_ids_of_region(region_id) {
None => return false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ unsafe extern "C" fn ffi_release_pre_handled_snapshot(
pub fn gen_engine_store_server_helper(
wrap: Pin<&EngineStoreServerWrap>,
) -> EngineStoreServerHelper {
info!("mock gen_engine_store_server_helper");
EngineStoreServerHelper {
magic_number: interfaces_ffi::RAFT_STORE_PROXY_MAGIC_NUMBER,
version: interfaces_ffi::RAFT_STORE_PROXY_VERSION,
Expand Down
2 changes: 1 addition & 1 deletion proxy_components/proxy_ffi/src/read_index_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn into_read_index_response<S: engine_traits::Snapshot>(
resp
}

fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
pub fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
let region_id = req.get_context().get_region_id();
let mut cmd = RaftCmdRequest::default();
{
Expand Down
284 changes: 284 additions & 0 deletions proxy_tests/proxy/shared/replica_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,287 @@ fn test_util() {
}
assert!(GC_MONITOR.valid_clean());
}

use kvproto::{
kvrpcpb::{Context, DiskFullOpt, KeyRange},
raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest},
raft_serverpb::RaftMessage,
};
use raftstore::{
router::RaftStoreRouter,
store::{msg::Callback, RaftCmdExtraOpts, ReadIndexContext},
};
use tokio::sync::oneshot;
use txn_types::{Key, Lock, LockType, TimeStamp};
use uuid::Uuid;

use crate::utils::v1_server::{new_server_cluster, ChannelBuilder, Environment, TikvClient};

// https://github.com/tikv/tikv/issues/16823
#[test]
fn test_raft_cmd_request_cant_advanve_max_ts() {
use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse};

let mut cluster = new_server_cluster(0, 1);
cluster.run();

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);

let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

let mut ctx = Context::default();
let region_id = leader.get_id();
ctx.set_region_id(leader.get_id());
ctx.set_region_epoch(region.get_region_epoch().clone());
ctx.set_peer(leader);

let read_index = |ranges: &[(&[u8], &[u8])]| {
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();

let mut cmd = RaftCmdRequest::default();
{
let mut header = RaftRequestHeader::default();
let mut inner_req = RaftRequest::default();
inner_req.set_cmd_type(CmdType::ReadIndex);
inner_req
.mut_read_index()
.set_start_ts(start_ts.into_inner());

let mut req = ReadIndexRequest::default();
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
req.set_context(ctx.clone());
req.set_start_ts(start_ts.into_inner());
for &(start_key, end_key) in ranges {
let mut range = KeyRange::default();
range.set_start_key(start_key.to_vec());
range.set_end_key(end_key.to_vec());
req.mut_ranges().push(range);
}

header.set_region_id(region_id);
header.set_peer(req.get_context().get_peer().clone());
header.set_region_epoch(req.get_context().get_region_epoch().clone());
cmd.set_header(header);
cmd.set_requests(vec![inner_req].into());
}

let (result_tx, result_rx) = oneshot::channel();
let router = cluster.get_router(1).unwrap();
if let Err(e) = router.send_command(
cmd,
Callback::read(Box::new(move |resp| {
result_tx.send(resp.response).unwrap();
})),
RaftCmdExtraOpts {
deadline: None,
disk_full_opt: DiskFullOpt::AllowedOnAlmostFull,
},
) {
panic!("router send msg failed, error: {}", e);
}

let resp = block_on(result_rx).unwrap();
(resp.get_responses()[0].get_read_index().clone(), start_ts)
};

// wait a while until the node updates its own max ts
std::thread::sleep(Duration::from_millis(300));

let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
assert!(!resp.has_locked());
// Actually not changed
assert_eq!(cm.max_ts(), prev_cm_max_ts);
assert_ne!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}

// https://github.com/tikv/tikv/pull/8669/files
#[test]
fn test_raft_cmd_request_learner_advanve_max_ts() {
use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse};

let mut cluster = new_server_cluster(0, 2);
cluster.pd_client.disable_default_operator();
let region_id = cluster.run_conf_change();
let region = cluster.get_region(b"");
assert_eq!(region_id, 1);
assert_eq!(region.get_id(), 1);
let leader = region.get_peers()[0].clone();

fail::cfg("on_pre_write_apply_state", "return(true)").unwrap();
let learner = new_learner_peer(2, 2);
cluster.pd_client.must_add_peer(1, learner.clone());

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);
let keys: Vec<_> = vec![b"k", b"l"]
.into_iter()
.map(|k| Key::from_raw(k))
.collect();
let guards = block_on(cm.lock_keys(keys.iter()));
let lock = Lock::new(
LockType::Put,
b"k".to_vec(),
1.into(),
20000,
None,
1.into(),
1,
2.into(),
false,
);
guards[0].with_lock(|l| *l = Some(lock.clone()));

let addr = cluster.sim.rl().get_addr(learner.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

// cluster.must_put(b"k", b"v");

let read_index = |ranges: &[(&[u8], &[u8])]| {
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();

// https://github.com/pingcap/tiflash/blob/14a127820d0530e496af624bb5b69acd48caf747/dbms/src/Storages/KVStore/Read/ReadIndex.cpp#L39
let mut ctx = Context::default();
let learner = learner.clone();
ctx.set_region_id(region_id);
ctx.set_region_epoch(region.get_region_epoch().clone());
ctx.set_peer(learner);
let mut read_index_request = ReadIndexRequest::default();
read_index_request.set_context(ctx);
read_index_request.set_start_ts(start_ts.into_inner());
for (s, e) in ranges {
let mut r = KeyRange::new();
r.set_start_key(s.to_vec());
r.set_end_key(e.to_vec());
read_index_request.mut_ranges().push(r);
}
let mut cmd =
proxy_ffi::read_index_helper::gen_read_index_raft_cmd_req(&mut read_index_request);

let (result_tx, result_rx) = oneshot::channel();
let router = cluster.get_router(2).unwrap();
if let Err(e) = router.send_command(
cmd,
Callback::read(Box::new(move |resp| {
result_tx.send(resp.response).unwrap();
})),
RaftCmdExtraOpts {
deadline: None,
disk_full_opt: DiskFullOpt::AllowedOnAlmostFull,
},
) {
panic!("router send msg failed, error: {}", e);
}

let resp = block_on(result_rx).unwrap();
(resp.get_responses()[0].get_read_index().clone(), start_ts)
};

// wait a while until the node updates its own max ts
std::thread::sleep(Duration::from_millis(3000));

must_wait_until_cond_node(
&cluster.cluster_ext,
region_id,
None,
&|states: &States| -> bool {
states.in_disk_region_state.get_region().get_peers().len() == 2
},
);

let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
assert!(!resp.has_locked());
// Actually not changed
assert_ne!(cm.max_ts(), prev_cm_max_ts);
assert_eq!(cm.max_ts(), start_ts);

// `gen_read_index_raft_cmd_req` can only handle one key-range
let (resp, start_ts) = read_index(&[(b"j", b"k0")]);
assert_eq!(resp.get_locked(), &lock.into_lock_info(b"k".to_vec()));
assert_eq!(cm.max_ts(), start_ts);

drop(guards);

let (resp, start_ts) = read_index(&[(b"a", b"z")]);
assert!(!resp.has_locked());
assert_eq!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}

#[test]
fn test_raft_message_can_advanve_max_ts() {
use kvproto::raft_cmdpb::{ReadIndexRequest, ReadIndexResponse};
let mut cluster = new_server_cluster(0, 1);
cluster.run();

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);

let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let follower = new_learner_peer(2, 2);
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

let region_id = leader.get_id();

let read_index = |ranges: &[(&[u8], &[u8])]| {
let mut m = raft::eraftpb::Message::default();
m.set_msg_type(MessageType::MsgReadIndex);
let mut read_index_req = ReadIndexRequest::default();
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
read_index_req.set_start_ts(start_ts.into_inner());
for &(start_key, end_key) in ranges {
let mut range = KeyRange::default();
range.set_start_key(start_key.to_vec());
range.set_end_key(end_key.to_vec());
read_index_req.mut_key_ranges().push(range);
}

let rctx = ReadIndexContext {
id: Uuid::new_v4(),
request: Some(read_index_req),
locked: None,
};
let mut e = raft::eraftpb::Entry::default();
e.set_data(rctx.to_bytes().into());
m.mut_entries().push(e);
m.set_from(2);

let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default();
raft_msg.set_region_id(region.get_id());
raft_msg.set_from_peer(follower);
raft_msg.set_to_peer(leader);
raft_msg.set_region_epoch(region.get_region_epoch().clone());
raft_msg.set_message(m);
cluster.send_raft_msg(raft_msg).unwrap();

(ReadIndexResponse::default(), start_ts)
};

// wait a while until the node updates its own max ts
let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
cluster.must_put(b"a", b"b");
std::thread::sleep(Duration::from_millis(2000));
// assert!(!resp.has_locked());
// Actually not changed
assert_ne!(cm.max_ts(), prev_cm_max_ts);
assert_eq!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}
Loading