From 9a32e75f227f837a6f16fb3afe159c1f8af2ad5b Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 6 Nov 2024 09:25:37 +0800 Subject: [PATCH] feat(compaction): default new compaction group for new table (#19080) Co-authored-by: zwang28 <84491488@qq.com> --- src/meta/src/barrier/mod.rs | 20 +-- src/meta/src/hummock/manager/commit_epoch.rs | 120 +++++++----------- .../src/hummock/mock_hummock_meta_client.rs | 7 +- .../hummock_test/src/hummock_storage_tests.rs | 51 +++----- .../tests/integration_tests/compaction/mod.rs | 25 ++-- 5 files changed, 92 insertions(+), 131 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 6a1cddc0ef748..7b9180bb154cc 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1213,7 +1213,7 @@ impl GlobalBarrierWorkerContextImpl { if is_first_time { commit_info .new_table_fragment_infos - .push(NewTableFragmentInfo::NewCompactionGroup { + .push(NewTableFragmentInfo { table_ids: tables_to_commit, }); }; @@ -1747,14 +1747,16 @@ fn collect_commit_epoch_info( && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) { let table_fragments = &info.table_fragments; - vec![NewTableFragmentInfo::Normal { - mv_table_id: table_fragments.mv_table_id().map(TableId::new), - internal_table_ids: table_fragments - .internal_table_ids() - .into_iter() - .map(TableId::new) - .collect(), - }] + let mut table_ids: HashSet<_> = table_fragments + .internal_table_ids() + .into_iter() + .map(TableId::new) + .collect(); + if let Some(mv_table_id) = table_fragments.mv_table_id() { + table_ids.insert(TableId::new(mv_table_id)); + } + + vec![NewTableFragmentInfo { table_ids }] } else { vec![] }; diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 5875feabf6db0..c51c77a5d36a0 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -19,7 +19,6 @@ use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::{ add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map, PbTableStatsMap, @@ -49,14 +48,8 @@ use crate::hummock::{ commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager, }; -pub enum NewTableFragmentInfo { - Normal { - mv_table_id: Option, - internal_table_ids: Vec, - }, - NewCompactionGroup { - table_ids: HashSet, - }, +pub struct NewTableFragmentInfo { + pub table_ids: HashSet, } #[derive(Default)] @@ -124,73 +117,48 @@ impl HummockManager { let mut compaction_group_config: Option> = None; // Add new table - for new_table_fragment_info in new_table_fragment_infos { - match new_table_fragment_info { - NewTableFragmentInfo::Normal { - mv_table_id, - internal_table_ids, - } => { - on_handle_add_new_table( - state_table_info, - &internal_table_ids, - StaticCompactionGroupId::StateDefault as u64, - &mut table_compaction_group_mapping, - &mut new_table_ids, - )?; - - on_handle_add_new_table( - state_table_info, - &mv_table_id, - StaticCompactionGroupId::MaterializedView as u64, - &mut table_compaction_group_mapping, - &mut new_table_ids, - )?; - } - NewTableFragmentInfo::NewCompactionGroup { table_ids } => { - let (compaction_group_manager, compaction_group_config) = - if let Some(compaction_group_manager) = &mut compaction_group_manager_txn { - ( - compaction_group_manager, - (*compaction_group_config - .as_ref() - .expect("must be set with compaction_group_manager_txn")) - .clone(), - ) - } else { - let compaction_group_manager_guard = - self.compaction_group_manager.write().await; - let new_compaction_group_config = - compaction_group_manager_guard.default_compaction_config(); - compaction_group_config = Some(new_compaction_group_config.clone()); - ( - compaction_group_manager_txn.insert( - CompactionGroupManager::start_owned_compaction_groups_txn( - compaction_group_manager_guard, - ), - ), - new_compaction_group_config, - ) - }; - let new_compaction_group_id = next_compaction_group_id(&self.env).await?; - new_compaction_groups - .insert(new_compaction_group_id, compaction_group_config.clone()); - compaction_group_manager.insert( - new_compaction_group_id, - CompactionGroup { - group_id: new_compaction_group_id, - compaction_config: compaction_group_config, - }, - ); - - on_handle_add_new_table( - state_table_info, - &table_ids, - new_compaction_group_id, - &mut table_compaction_group_mapping, - &mut new_table_ids, - )?; - } - } + for NewTableFragmentInfo { table_ids } in new_table_fragment_infos { + let (compaction_group_manager, compaction_group_config) = + if let Some(compaction_group_manager) = &mut compaction_group_manager_txn { + ( + compaction_group_manager, + (*compaction_group_config + .as_ref() + .expect("must be set with compaction_group_manager_txn")) + .clone(), + ) + } else { + let compaction_group_manager_guard = + self.compaction_group_manager.write().await; + let new_compaction_group_config = + compaction_group_manager_guard.default_compaction_config(); + compaction_group_config = Some(new_compaction_group_config.clone()); + ( + compaction_group_manager_txn.insert( + CompactionGroupManager::start_owned_compaction_groups_txn( + compaction_group_manager_guard, + ), + ), + new_compaction_group_config, + ) + }; + let new_compaction_group_id = next_compaction_group_id(&self.env).await?; + new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone()); + compaction_group_manager.insert( + new_compaction_group_id, + CompactionGroup { + group_id: new_compaction_group_id, + compaction_config: compaction_group_config, + }, + ); + + on_handle_add_new_table( + state_table_info, + &table_ids, + new_compaction_group_id, + &mut table_compaction_group_mapping, + &mut new_table_ids, + )?; } let commit_sstables = self diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 805db163587a0..75bc3f121f43f 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -177,13 +177,12 @@ impl HummockMetaClient for MockHummockMetaClient { { vec![] } else { - vec![NewTableFragmentInfo::Normal { - mv_table_id: None, - internal_table_ids: commit_table_ids + vec![NewTableFragmentInfo { + table_ids: commit_table_ids .iter() .cloned() .map(TableId::from) - .collect_vec(), + .collect(), }] }; diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 4e6ab26a539c6..24ba1ca1cf779 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -26,7 +26,6 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::range::RangeBoundsExt; use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH}; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, }; @@ -36,6 +35,7 @@ use risingwave_hummock_sdk::table_watermark::{ TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo}; +use risingwave_meta::hummock::test_utils::get_compaction_group_id_by_table_id; use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; @@ -2635,20 +2635,20 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch1, sst1_epoch1.clone(), - vec![NewTableFragmentInfo::Normal { - mv_table_id: None, - internal_table_ids: vec![existing_table_id], + vec![NewTableFragmentInfo { + table_ids: HashSet::from_iter([existing_table_id]), }], &[existing_table_id], ) .await; - let old_cg_id_set: HashSet<_> = { + let cg_id = + get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id()) + .await; + + { let version = test_env.manager.get_current_version().await; - let cg = version - .levels - .get(&(StaticCompactionGroupId::StateDefault as _)) - .unwrap(); + let cg = version.levels.get(&(cg_id)).unwrap(); let sub_levels = &cg.l0.sub_levels; assert_eq!(sub_levels.len(), 1); let sub_level = &sub_levels[0]; @@ -2661,13 +2661,8 @@ async fn test_commit_multi_epoch() { .get(&existing_table_id) .unwrap(); assert_eq!(epoch1, info.committed_epoch); - assert_eq!( - StaticCompactionGroupId::StateDefault as u64, - info.compaction_group_id - ); - - version.levels.keys().cloned().collect() - }; + assert_eq!(cg_id, info.compaction_group_id); + } let sst1_epoch2 = SstableInfo { sst_id: 22, @@ -2684,10 +2679,7 @@ async fn test_commit_multi_epoch() { { let version = test_env.manager.get_current_version().await; - let cg = version - .levels - .get(&(StaticCompactionGroupId::StateDefault as _)) - .unwrap(); + let cg = version.levels.get(&(cg_id)).unwrap(); let sub_levels = &cg.l0.sub_levels; assert_eq!(sub_levels.len(), 2); let sub_level = &sub_levels[0]; @@ -2703,10 +2695,7 @@ async fn test_commit_multi_epoch() { .get(&existing_table_id) .unwrap(); assert_eq!(epoch2, info.committed_epoch); - assert_eq!( - StaticCompactionGroupId::StateDefault as u64, - info.compaction_group_id - ); + assert_eq!(cg_id, info.compaction_group_id); }; let new_table_id = TableId::new(2); @@ -2723,7 +2712,7 @@ async fn test_commit_multi_epoch() { commit_epoch( epoch1, sst2_epoch1.clone(), - vec![NewTableFragmentInfo::NewCompactionGroup { + vec![NewTableFragmentInfo { table_ids: HashSet::from_iter([new_table_id]), }], &[new_table_id], @@ -2732,10 +2721,9 @@ async fn test_commit_multi_epoch() { let new_cg_id = { let version = test_env.manager.get_current_version().await; - let new_cg_id_set: HashSet<_> = version.levels.keys().cloned().collect(); - let added_cg_id_set = &new_cg_id_set - &old_cg_id_set; - assert_eq!(added_cg_id_set.len(), 1); - let new_cg_id = added_cg_id_set.into_iter().next().unwrap(); + let new_cg_id = + get_compaction_group_id_by_table_id(test_env.manager.clone(), new_table_id.table_id()) + .await; let new_cg = version.levels.get(&new_cg_id).unwrap(); let sub_levels = &new_cg.l0.sub_levels; @@ -2801,10 +2789,7 @@ async fn test_commit_multi_epoch() { { let version = test_env.manager.get_current_version().await; - let old_cg = version - .levels - .get(&(StaticCompactionGroupId::StateDefault as _)) - .unwrap(); + let old_cg = version.levels.get(&cg_id).unwrap(); let sub_levels = &old_cg.l0.sub_levels; assert_eq!(sub_levels.len(), 3); let sub_level1 = &sub_levels[0]; diff --git a/src/tests/simulation/tests/integration_tests/compaction/mod.rs b/src/tests/simulation/tests/integration_tests/compaction/mod.rs index f9f23ac72387c..8bbd0cbd31389 100644 --- a/src/tests/simulation/tests/integration_tests/compaction/mod.rs +++ b/src/tests/simulation/tests/integration_tests/compaction/mod.rs @@ -122,18 +122,25 @@ async fn test_vnode_watermark_reclaim_impl( .parse::() .unwrap(); - // Move the table to a dedicated group to prevent its vnode watermark from being reclaimed during the compaction of other tables. - cluster.split_compaction_group(2, table_id).await.unwrap(); tokio::time::sleep(Duration::from_secs(5)).await; - let compaction_group_id = session - .run(format!( - "SELECT id FROM rw_hummock_compaction_group_configs where member_tables @> '[{}]'::jsonb;", - table_id - )) + async fn compaction_group_id_by_table_id(session: &mut Session, table_id: u64) -> u64 { + session + .run(format!( + "SELECT id FROM rw_hummock_compaction_group_configs where member_tables @> '[{}]'::jsonb;", + table_id + )) + .await + .unwrap() + .parse::() + .unwrap() + } + let original_compaction_group_id = compaction_group_id_by_table_id(session, table_id).await; + // Move the table to a dedicated group to prevent its vnode watermark from being reclaimed during the compaction of other tables. + cluster + .split_compaction_group(original_compaction_group_id, table_id) .await - .unwrap() - .parse::() .unwrap(); + let compaction_group_id = compaction_group_id_by_table_id(session, table_id).await; session .run("INSERT INTO t2 VALUES (now(), 1);")