Skip to content

Commit 72d5717

Browse files
SpadeA-Tangdbsid
authored andcommitted
raftstore: make manual compaction in cleanup worker be able to be ignored dynamically (tikv#16547)
close tikv#15282 make manual compaction in cleanup worker be able to be ignored dynamically Signed-off-by: SpadeA-Tang <[email protected]> Signed-off-by: dbsid <[email protected]>
1 parent b5e0563 commit 72d5717

File tree

7 files changed

+120
-41
lines changed

7 files changed

+120
-41
lines changed

components/raftstore/src/store/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,11 @@ pub struct Config {
414414
#[doc(hidden)]
415415
#[online_config(hidden)]
416416
pub min_pending_apply_region_count: u64,
417+
418+
/// Whether to skip manual compaction in the clean up worker for `write` and
419+
/// `default` column family
420+
#[doc(hidden)]
421+
pub skip_manual_compaction_in_clean_up_worker: bool,
417422
}
418423

419424
impl Default for Config {
@@ -552,6 +557,7 @@ impl Default for Config {
552557
enable_v2_compatible_learner: false,
553558
unsafe_disable_check_quorum: false,
554559
min_pending_apply_region_count: 10,
560+
skip_manual_compaction_in_clean_up_worker: false,
555561
}
556562
}
557563
}

components/raftstore/src/store/fsm/store.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1704,7 +1704,12 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
17041704
ReadRunner::new(self.router.clone(), engines.raft.clone()),
17051705
);
17061706

1707-
let compact_runner = CompactRunner::new(engines.kv.clone(), bgworker_remote);
1707+
let compact_runner = CompactRunner::new(
1708+
engines.kv.clone(),
1709+
bgworker_remote,
1710+
cfg.clone().tracker(String::from("compact-runner")),
1711+
cfg.value().skip_manual_compaction_in_clean_up_worker,
1712+
);
17081713
let cleanup_sst_runner = CleanupSstRunner::new(Arc::clone(&importer));
17091714
let gc_snapshot_runner = GcSnapshotRunner::new(
17101715
meta.get_id(),

components/raftstore/src/store/worker/compact.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,20 @@ use std::{
88
time::Duration,
99
};
1010

11-
use engine_traits::{KvEngine, ManualCompactionOptions, RangeStats, CF_WRITE};
11+
use engine_traits::{KvEngine, ManualCompactionOptions, RangeStats, CF_LOCK, CF_WRITE};
1212
use fail::fail_point;
1313
use futures_util::compat::Future01CompatExt;
1414
use thiserror::Error;
1515
use tikv_util::{
16-
box_try, debug, error, info, time::Instant, timer::GLOBAL_TIMER_HANDLE, warn, worker::Runnable,
16+
box_try, config::Tracker, debug, error, info, time::Instant, timer::GLOBAL_TIMER_HANDLE, warn,
17+
worker::Runnable,
1718
};
1819
use yatp::Remote;
1920

2021
use super::metrics::{
2122
COMPACT_RANGE_CF, FULL_COMPACT, FULL_COMPACT_INCREMENTAL, FULL_COMPACT_PAUSE,
2223
};
24+
use crate::store::Config;
2325

2426
type Key = Vec<u8>;
2527

@@ -214,14 +216,27 @@ pub enum Error {
214216
pub struct Runner<E> {
215217
engine: E,
216218
remote: Remote<yatp::task::future::TaskCell>,
219+
cfg_tracker: Tracker<Config>,
220+
// Whether to skip the manual compaction of write and default comlumn family.
221+
skip_compact: bool,
217222
}
218223

219224
impl<E> Runner<E>
220225
where
221226
E: KvEngine,
222227
{
223-
pub fn new(engine: E, remote: Remote<yatp::task::future::TaskCell>) -> Runner<E> {
224-
Runner { engine, remote }
228+
pub fn new(
229+
engine: E,
230+
remote: Remote<yatp::task::future::TaskCell>,
231+
cfg_tracker: Tracker<Config>,
232+
skip_compact: bool,
233+
) -> Runner<E> {
234+
Runner {
235+
engine,
236+
remote,
237+
cfg_tracker,
238+
skip_compact,
239+
}
225240
}
226241

227242
/// Periodic full compaction.
@@ -369,6 +384,21 @@ where
369384
bottommost_level_force,
370385
} => {
371386
let cf = &cf_name;
387+
if cf != CF_LOCK {
388+
// check whether the config changed for ignoring manual compaction
389+
if let Some(incoming) = self.cfg_tracker.any_new() {
390+
self.skip_compact = incoming.skip_manual_compaction_in_clean_up_worker;
391+
}
392+
if self.skip_compact {
393+
info!(
394+
"skip compact range";
395+
"range_start" => start_key.as_ref().map(|k| log_wrappers::Value::key(k)),
396+
"range_end" => end_key.as_ref().map(|k|log_wrappers::Value::key(k)),
397+
"cf" => cf_name,
398+
);
399+
return;
400+
}
401+
}
372402
if let Err(e) = self.compact_range_cf(
373403
cf,
374404
start_key.as_deref(),
@@ -498,7 +528,10 @@ mod tests {
498528
E: KvEngine,
499529
{
500530
let pool = YatpPoolBuilder::new(DefaultTicker::default()).build_future_pool();
501-
(pool.clone(), Runner::new(engine, pool.remote().clone()))
531+
(
532+
pool.clone(),
533+
Runner::new(engine, pool.remote().clone(), Tracker::default(), false),
534+
)
502535
}
503536

504537
#[test]

components/test_raftstore-v2/src/node.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
311311
let (sender, _) = mpsc::unbounded();
312312
let bg_worker = WorkerBuilder::new("background").thread_count(2).create();
313313
let state: Arc<Mutex<GlobalReplicationState>> = Arc::default();
314+
let store_config = Arc::new(VersionTrack::new(raft_store));
314315
node.start(
315316
raft_engine.clone(),
316317
tablet_registry,
@@ -324,7 +325,7 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
324325
CollectorRegHandle::new_for_test(),
325326
bg_worker,
326327
pd_worker,
327-
Arc::new(VersionTrack::new(raft_store)),
328+
store_config.clone(),
328329
&state,
329330
importer,
330331
key_manager,
@@ -338,27 +339,11 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
338339
);
339340
assert!(node_id == 0 || node_id == node.id());
340341
let node_id = node.id();
341-
342-
let region_split_size = cfg.coprocessor.region_split_size();
343-
let enable_region_bucket = cfg.coprocessor.enable_region_bucket();
344-
let region_bucket_size = cfg.coprocessor.region_bucket_size;
345-
let mut raftstore_cfg = cfg.tikv.raft_store;
346-
raftstore_cfg.optimize_for(true);
347-
raftstore_cfg
348-
.validate(
349-
region_split_size,
350-
enable_region_bucket,
351-
region_bucket_size,
352-
true,
353-
)
354-
.unwrap();
355-
356-
let raft_store = Arc::new(VersionTrack::new(raftstore_cfg));
357342
cfg_controller.register(
358343
Module::Raftstore,
359344
Box::new(RaftstoreConfigManager::new(
360345
node.refresh_config_scheduler(),
361-
raft_store,
346+
store_config,
362347
)),
363348
);
364349

components/test_raftstore/src/common-test.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ store-io-pool-size = 0
7373
apply-pool-size = 1
7474
store-pool-size = 1
7575
snap-generator-pool-size = 2
76+
skip-manual-compaction-in-clean_up-worker = false
7677
[coprocessor]
7778

7879
[rocksdb]

components/test_raftstore/src/node.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -246,10 +246,11 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
246246
)
247247
.unwrap();
248248
let bg_worker = WorkerBuilder::new("background").thread_count(2).create();
249+
let store_config = Arc::new(VersionTrack::new(raft_store));
249250
let mut node = Node::new(
250251
system,
251252
&cfg.server,
252-
Arc::new(VersionTrack::new(raft_store)),
253+
store_config.clone(),
253254
cfg.storage.api_version(),
254255
Arc::clone(&self.pd_client),
255256
Arc::default(),
@@ -353,25 +354,11 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
353354
.map(|p| p.path().to_str().unwrap().to_owned())
354355
);
355356

356-
let region_split_size = cfg.coprocessor.region_split_size();
357-
let enable_region_bucket = cfg.coprocessor.enable_region_bucket();
358-
let region_bucket_size = cfg.coprocessor.region_bucket_size;
359-
let mut raftstore_cfg = cfg.tikv.raft_store;
360-
raftstore_cfg.optimize_for(false);
361-
raftstore_cfg
362-
.validate(
363-
region_split_size,
364-
enable_region_bucket,
365-
region_bucket_size,
366-
false,
367-
)
368-
.unwrap();
369-
let raft_store = Arc::new(VersionTrack::new(raftstore_cfg));
370357
cfg_controller.register(
371358
Module::Raftstore,
372359
Box::new(RaftstoreConfigManager::new(
373360
node.refresh_config_scheduler(),
374-
raft_store,
361+
store_config,
375362
)),
376363
);
377364

tests/failpoints/cases/test_split_region.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use kvproto::{
1717
Mutation, Op, PessimisticLockRequest, PrewriteRequest, PrewriteRequestPessimisticAction::*,
1818
},
1919
metapb::Region,
20-
pdpb::CheckPolicy,
20+
pdpb::{self, CheckPolicy},
2121
raft_serverpb::{PeerState, RaftMessage},
2222
tikvpb::TikvClient,
2323
};
@@ -1610,3 +1610,65 @@ fn test_split_by_split_check_on_keys() {
16101610
// waiting the split,
16111611
cluster.wait_region_split(&region);
16121612
}
1613+
1614+
fn change(name: &str, value: &str) -> std::collections::HashMap<String, String> {
1615+
let mut m = std::collections::HashMap::new();
1616+
m.insert(name.to_owned(), value.to_owned());
1617+
m
1618+
}
1619+
1620+
#[test]
1621+
fn test_turn_off_manual_compaction_caused_by_no_valid_split_key() {
1622+
let mut cluster = new_node_cluster(0, 1);
1623+
cluster.run();
1624+
let r = cluster.get_region(b"");
1625+
cluster.must_split(&r, b"k1");
1626+
let r = cluster.get_region(b"k1");
1627+
cluster.must_split(&r, b"k2");
1628+
cluster.must_put(b"k1", b"val");
1629+
1630+
let (tx, rx) = sync_channel(5);
1631+
fail::cfg_callback("on_compact_range_cf", move || {
1632+
tx.send(true).unwrap();
1633+
})
1634+
.unwrap();
1635+
1636+
let safe_point_inject = "safe_point_inject";
1637+
fail::cfg(safe_point_inject, "return(100)").unwrap();
1638+
1639+
{
1640+
let sim = cluster.sim.rl();
1641+
let cfg_controller = sim.get_cfg_controller(1).unwrap();
1642+
cfg_controller
1643+
.update(change(
1644+
"raftstore.skip-manual-compaction-in-clean_up-worker",
1645+
"true",
1646+
))
1647+
.unwrap();
1648+
}
1649+
1650+
let r = cluster.get_region(b"k1");
1651+
cluster
1652+
.pd_client
1653+
.split_region(r.clone(), pdpb::CheckPolicy::Usekey, vec![b"k1".to_vec()]);
1654+
rx.recv_timeout(Duration::from_secs(1)).unwrap_err();
1655+
1656+
{
1657+
let sim = cluster.sim.rl();
1658+
let cfg_controller = sim.get_cfg_controller(1).unwrap();
1659+
cfg_controller
1660+
.update(change(
1661+
"raftstore.skip-manual-compaction-in-clean_up-worker",
1662+
"false",
1663+
))
1664+
.unwrap();
1665+
}
1666+
1667+
cluster
1668+
.pd_client
1669+
.split_region(r, pdpb::CheckPolicy::Usekey, vec![b"k1".to_vec()]);
1670+
fail::cfg(safe_point_inject, "return(200)").unwrap();
1671+
rx.recv_timeout(Duration::from_secs(1)).unwrap();
1672+
rx.recv_timeout(Duration::from_secs(1)).unwrap();
1673+
rx.try_recv().unwrap_err();
1674+
}

0 commit comments

Comments
 (0)