From f0467ca1f25d9afe586adc3109cd7d87453591c1 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 14 Oct 2024 16:25:43 +0800 Subject: [PATCH 1/7] feat(meta): commit epoch in separate group delta --- proto/hummock.proto | 5 + src/meta/src/hummock/manager/checkpoint.rs | 27 +- .../src/hummock/manager/compaction/mod.rs | 8 +- src/meta/src/hummock/manager/transaction.rs | 12 +- src/prost/build.rs | 1 + .../compaction_group/hummock_version_ext.rs | 361 ++++++++---------- src/storage/hummock_sdk/src/version.rs | 52 ++- 7 files changed, 226 insertions(+), 240 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 6d3d7b0d32d7d..c95912ebd94ef 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -63,6 +63,10 @@ message InputLevel { repeated SstableInfo table_infos = 3; } +message NewL0SubLevel { + repeated SstableInfo inserted_table_infos = 1; +} + message IntraLevelDelta { uint32 level_idx = 1; uint64 l0_sub_level_id = 2; @@ -112,6 +116,7 @@ message GroupDelta { GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; GroupMerge group_merge = 6; + NewL0SubLevel new_l0_sub_level = 7; } } diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index f678014d440c8..8c6a57c7347ce 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -17,10 +17,8 @@ use std::ops::Bound::{Excluded, Included}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::Ordering; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - object_size_map, summarize_group_deltas, -}; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map; +use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion}; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects}; use risingwave_pb::hummock::{ @@ -156,13 +154,24 @@ impl HummockManager { .hummock_version_deltas .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id))) { - for (group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas, *group_id); + for group_deltas in version_delta.group_deltas.values() { object_sizes.extend( - summary - .insert_table_infos + group_deltas + .group_deltas .iter() - .map(|t| (t.object_id, t.file_size)) + .flat_map(|delta| { + match delta { + GroupDeltaCommon::IntraLevel(level_delta) => Some(level_delta), + _ => None, + } + .into_iter() + .flat_map(|level_delta| { + level_delta + .inserted_table_infos + .iter() + .map(|t| (t.object_id, t.file_size)) + }) + }) .chain( version_delta .change_log_delta diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index f94c01efbd7ae..d8cbcad27c095 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -153,17 +153,15 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compact_task.compaction_group_id) .or_default() .group_deltas; - let mut removed_table_ids_map: BTreeMap> = BTreeMap::default(); + let mut removed_table_ids_map: BTreeMap> = BTreeMap::default(); for level in &compact_task.input_ssts { let level_idx = level.level_idx; - let mut removed_table_ids = - level.table_infos.iter().map(|sst| sst.sst_id).collect_vec(); removed_table_ids_map .entry(level_idx) .or_default() - .append(&mut removed_table_ids); + .extend(level.table_infos.iter().map(|sst| sst.sst_id)); } for (level_idx, removed_table_ids) in removed_table_ids_map { @@ -181,7 +179,7 @@ impl<'a> HummockVersionTransaction<'a> { let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( compact_task.target_level, compact_task.target_sub_level_id, - vec![], // default + HashSet::new(), // default compact_task.sorted_output_ssts.clone(), compact_task.split_weight_by_vnode, )); diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 04d5e237a11df..b59490d5beadb 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -20,9 +20,7 @@ use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarks; -use risingwave_hummock_sdk::version::{ - GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, -}; +use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, }; @@ -154,13 +152,7 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compaction_group_id) .or_default() .group_deltas; - let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( - 0, - 0, // l0_sub_level_id will be generated during apply_version_delta - vec![], // default - inserted_table_infos, - 0, // default - )); + let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos); group_deltas.push(group_delta); } diff --git a/src/prost/build.rs b/src/prost/build.rs index ee04705ef19e5..0e1b2ea5c1db6 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -183,6 +183,7 @@ fn main() -> Result<(), Box> { .type_attribute("hummock.GroupTableChange", "#[derive(Eq)]") .type_attribute("hummock.GroupMerge", "#[derive(Eq)]") .type_attribute("hummock.GroupDelta", "#[derive(Eq)]") + .type_attribute("hummock.NewL0SubLevel", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler", "#[derive(Eq)]") .type_attribute("hummock.TableOption", "#[derive(Eq)]") diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0ffdd15eca498..b3cdcbe2aead8 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -22,8 +22,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, PbLevelType, - StateTableInfo, StateTableInfoDelta, + CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -36,83 +35,11 @@ use crate::level::{Level, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ - GroupDelta, GroupDeltas, HummockVersion, HummockVersionCommon, HummockVersionDelta, - HummockVersionStateTableInfo, IntraLevelDelta, + GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta, + HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, }; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; -pub struct GroupDeltasSummary { - pub delete_sst_levels: Vec, - pub delete_sst_ids_set: HashSet, - pub insert_sst_level_id: u32, - pub insert_sub_level_id: u64, - pub insert_table_infos: Vec, - pub group_construct: Option, - pub group_destroy: Option, - pub new_vnode_partition_count: u32, - pub group_merge: Option, -} - -pub fn summarize_group_deltas( - group_deltas: &GroupDeltas, - compaction_group_id: CompactionGroupId, -) -> GroupDeltasSummary { - let mut delete_sst_levels = Vec::with_capacity(group_deltas.group_deltas.len()); - let mut delete_sst_ids_set = HashSet::new(); - let mut insert_sst_level_id = u32::MAX; - let mut insert_sub_level_id = u64::MAX; - let mut insert_table_infos = vec![]; - let mut group_construct = None; - let mut group_destroy = None; - let mut new_vnode_partition_count = 0; - let mut group_merge = None; - - for group_delta in &group_deltas.group_deltas { - match group_delta { - GroupDelta::IntraLevel(intra_level) => { - if !intra_level.removed_table_ids.is_empty() { - delete_sst_levels.push(intra_level.level_idx); - delete_sst_ids_set.extend(intra_level.removed_table_ids.iter().clone()); - } - if !intra_level.inserted_table_infos.is_empty() { - insert_sst_level_id = intra_level.level_idx; - insert_sub_level_id = intra_level.l0_sub_level_id; - insert_table_infos.extend(intra_level.inserted_table_infos.iter().cloned()); - } - new_vnode_partition_count = intra_level.vnode_partition_count; - } - GroupDelta::GroupConstruct(construct_delta) => { - assert!(group_construct.is_none()); - group_construct = Some(construct_delta.clone()); - } - GroupDelta::GroupDestroy(_) => { - assert!(group_destroy.is_none()); - group_destroy = Some(compaction_group_id); - } - GroupDelta::GroupMerge(merge_delta) => { - assert!(group_merge.is_none()); - group_merge = Some(*merge_delta); - group_destroy = Some(merge_delta.right_group_id); - } - } - } - - delete_sst_levels.sort(); - delete_sst_levels.dedup(); - - GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, - group_construct, - group_destroy, - new_vnode_partition_count, - group_merge, - } -} - #[derive(Clone, Default)] pub struct TableGroupInfo { pub group_id: CompactionGroupId, @@ -587,118 +514,136 @@ impl HummockVersion { // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas, *compaction_group_id); - if let Some(group_construct) = &summary.group_construct { - let mut new_levels = build_initial_compaction_group_levels( - *compaction_group_id, - group_construct.get_group_config().unwrap(), - ); - let parent_group_id = group_construct.parent_group_id; - new_levels.parent_group_id = parent_group_id; - #[expect(deprecated)] - // for backward-compatibility of previous hummock version delta - new_levels - .member_table_ids - .clone_from(&group_construct.table_ids); - self.levels.insert(*compaction_group_id, new_levels); - let member_table_ids = - if group_construct.version >= CompatibilityVersion::NoMemberTableIds as _ { - self.state_table_info - .compaction_group_member_table_ids(*compaction_group_id) - .iter() - .map(|table_id| table_id.table_id) - .collect() - } else { + for group_delta in &group_deltas.group_deltas { + match group_delta { + GroupDeltaCommon::GroupConstruct(group_construct) => { + let mut new_levels = build_initial_compaction_group_levels( + *compaction_group_id, + group_construct.get_group_config().unwrap(), + ); + let parent_group_id = group_construct.parent_group_id; + new_levels.parent_group_id = parent_group_id; #[expect(deprecated)] // for backward-compatibility of previous hummock version delta - BTreeSet::from_iter(group_construct.table_ids.clone()) - }; - - if group_construct.version >= CompatibilityVersion::SplitGroupByTableId as _ { - let split_key = if group_construct.split_key.is_some() { - Some(Bytes::from(group_construct.split_key.clone().unwrap())) - } else { - None - }; - self.init_with_parent_group_v2( - parent_group_id, - *compaction_group_id, - group_construct.get_new_sst_start_id(), - split_key.clone(), - ); - } else { - // for backward-compatibility of previous hummock version delta - self.init_with_parent_group( - parent_group_id, - *compaction_group_id, - member_table_ids, - group_construct.get_new_sst_start_id(), - ); - } - } else if let Some(group_merge) = &summary.group_merge { - tracing::info!( - "group_merge left {:?} right {:?}", - group_merge.left_group_id, - group_merge.right_group_id - ); - self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) - } - let group_destroy = summary.group_destroy; - let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { - panic!("compaction group {} does not exist", compaction_group_id) - }); - - if is_commit_epoch { - let GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - .. - } = summary; - - assert!( - delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() - || group_destroy.is_some(), - "no sst should be deleted when committing an epoch" - ); - let mut next_l0_sub_level_id = levels - .l0 - .sub_levels - .last() - .map(|level| level.sub_level_id + 1) - .unwrap_or(1); - for group_delta in &group_deltas.group_deltas { - if let GroupDelta::IntraLevel(IntraLevelDelta { - level_idx, - inserted_table_infos, - .. - }) = group_delta - { - assert_eq!( - *level_idx, 0, - "we should only add to L0 when we commit an epoch." + new_levels + .member_table_ids + .clone_from(&group_construct.table_ids); + self.levels.insert(*compaction_group_id, new_levels); + let member_table_ids = if group_construct.version + >= CompatibilityVersion::NoMemberTableIds as _ + { + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect() + } else { + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta + BTreeSet::from_iter(group_construct.table_ids.clone()) + }; + + if group_construct.version >= CompatibilityVersion::SplitGroupByTableId as _ + { + let split_key = if group_construct.split_key.is_some() { + Some(Bytes::from(group_construct.split_key.clone().unwrap())) + } else { + None + }; + self.init_with_parent_group_v2( + parent_group_id, + *compaction_group_id, + group_construct.get_new_sst_start_id(), + split_key.clone(), + ); + } else { + // for backward-compatibility of previous hummock version delta + self.init_with_parent_group( + parent_group_id, + *compaction_group_id, + member_table_ids, + group_construct.get_new_sst_start_id(), + ); + } + } + GroupDeltaCommon::GroupMerge(group_merge) => { + tracing::info!( + "group_merge left {:?} right {:?}", + group_merge.left_group_id, + group_merge.right_group_id ); - if !inserted_table_infos.is_empty() { - insert_new_sub_level( - &mut levels.l0, - next_l0_sub_level_id, - PbLevelType::Overlapping, - inserted_table_infos.clone(), - None, + self.merge_compaction_group( + group_merge.left_group_id, + group_merge.right_group_id, + ) + } + GroupDeltaCommon::IntraLevel(level_delta) => { + let levels = + self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); + if is_commit_epoch { + assert!( + level_delta.removed_table_ids.is_empty(), + "no sst should be deleted when committing an epoch" + ); + + let IntraLevelDelta { + level_idx, + l0_sub_level_id, + inserted_table_infos, + .. + } = level_delta; + { + assert_eq!( + *level_idx, 0, + "we should only add to L0 when we commit an epoch." + ); + if !inserted_table_infos.is_empty() { + insert_new_sub_level( + &mut levels.l0, + *l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + } + } + } else { + // The delta is caused by compaction. + levels.apply_compact_ssts( + level_delta, + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id), ); - next_l0_sub_level_id += 1; } } + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + let levels = + self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); + assert!(is_commit_epoch); + + let next_l0_sub_level_id = levels + .l0 + .sub_levels + .last() + .map(|level| level.sub_level_id + 1) + .unwrap_or(1); + + insert_new_sub_level( + &mut levels.l0, + next_l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + } + GroupDeltaCommon::GroupDestroy(_) => { + self.levels.remove(compaction_group_id); + } } - } else { - // The delta is caused by compaction. - levels.apply_compact_ssts( - summary, - self.state_table_info - .compaction_group_member_table_ids(*compaction_group_id), - ); - } - if let Some(destroy_group_id) = &group_destroy { - self.levels.remove(destroy_group_id); } } self.id = version_delta.id; @@ -1009,50 +954,50 @@ impl HummockVersionCommon { impl Levels { pub fn apply_compact_ssts( &mut self, - summary: GroupDeltasSummary, + level_delta: &IntraLevelDeltaCommon, member_table_ids: &BTreeSet, ) { - let GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, - new_vnode_partition_count, + let IntraLevelDeltaCommon { + level_idx, + l0_sub_level_id, + inserted_table_infos: insert_table_infos, + vnode_partition_count, + removed_table_ids: delete_sst_ids_set, .. - } = summary; + } = level_delta; + let new_vnode_partition_count = *vnode_partition_count; - if !self.check_deleted_sst_exist(&delete_sst_levels, delete_sst_ids_set.clone()) { + if !self.check_deleted_sst_exist(&[*level_idx], delete_sst_ids_set.clone()) { warn!( "This VersionDelta may be committed by an expired compact task. Please check it. \n - delete_sst_levels: {:?}\n, - delete_sst_ids_set: {:?}\n, insert_sst_level_id: {}\n, insert_sub_level_id: {}\n, - insert_table_infos: {:?}\n", - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, + insert_table_infos: {:?}\n, + delete_sst_ids_set: {:?}\n", + level_idx, + l0_sub_level_id, insert_table_infos .iter() .map(|sst| (sst.sst_id, sst.object_id)) - .collect_vec() + .collect_vec(), + delete_sst_ids_set, ); return; } - for level_idx in &delete_sst_levels { + if !delete_sst_ids_set.is_empty() { if *level_idx == 0 { for level in &mut self.l0.sub_levels { - level_delete_ssts(level, &delete_sst_ids_set); + level_delete_ssts(level, delete_sst_ids_set); } } else { let idx = *level_idx as usize - 1; - level_delete_ssts(&mut self.levels[idx], &delete_sst_ids_set); + level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set); } } if !insert_table_infos.is_empty() { + let insert_sst_level_id = *level_idx; + let insert_sub_level_id = *l0_sub_level_id; if insert_sst_level_id == 0 { let l0 = &mut self.l0; let index = l0 @@ -1093,7 +1038,7 @@ impl Levels { level_insert_ssts(&mut self.levels[idx], insert_table_infos); } } - if delete_sst_levels.iter().any(|level_id| *level_id == 0) { + if *level_idx == 0 && !delete_sst_ids_set.is_empty() { self.l0 .sub_levels .retain(|level| !level.table_infos.is_empty()); @@ -1358,7 +1303,7 @@ fn level_delete_ssts( original_len != operand.table_infos.len() } -fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) { +fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec) { operand.total_file_size += insert_table_infos .iter() .map(|sst| sst.sst_size) @@ -1367,7 +1312,9 @@ fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) .iter() .map(|sst| sst.uncompressed_file_size) .sum::(); - operand.table_infos.extend(insert_table_infos); + operand + .table_infos + .extend(insert_table_infos.iter().cloned()); operand .table_infos .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range)); @@ -1501,7 +1448,7 @@ pub fn validate_version(version: &HummockVersion) -> Vec { #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use bytes::Bytes; use risingwave_common::catalog::TableId; @@ -1655,7 +1602,7 @@ mod tests { group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new( 1, 0, - vec![], + HashSet::new(), vec![SstableInfo { object_id: 1, sst_id: 1, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index dbd927e3d724a..fd796c76783de 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -21,12 +21,12 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; -use risingwave_pb::hummock::group_delta::PbDeltaType; +use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType}; use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; use risingwave_pb::hummock::{ CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge, - PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo, - StateTableInfo, StateTableInfoDelta, + PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbNewL0SubLevel, PbSstableInfo, + PbStateTableInfo, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -785,7 +785,7 @@ where pub struct IntraLevelDeltaCommon { pub level_idx: u32, pub l0_sub_level_id: u64, - pub removed_table_ids: Vec, + pub removed_table_ids: HashSet, pub inserted_table_infos: Vec, pub vnode_partition_count: u32, } @@ -814,7 +814,7 @@ where Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids, + removed_table_ids: HashSet::from_iter(pb_intra_level_delta.removed_table_ids), inserted_table_infos: pb_intra_level_delta .inserted_table_infos .into_iter() @@ -833,7 +833,7 @@ where Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids, + removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(), inserted_table_infos: intra_level_delta .inserted_table_infos .into_iter() @@ -852,7 +852,11 @@ where Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids.clone(), + removed_table_ids: intra_level_delta + .removed_table_ids + .iter() + .cloned() + .collect(), inserted_table_infos: intra_level_delta .inserted_table_infos .iter() @@ -871,7 +875,9 @@ where Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids.clone(), + removed_table_ids: HashSet::from_iter( + pb_intra_level_delta.removed_table_ids.iter().cloned(), + ), inserted_table_infos: pb_intra_level_delta .inserted_table_infos .iter() @@ -886,7 +892,7 @@ impl IntraLevelDelta { pub fn new( level_idx: u32, l0_sub_level_id: u64, - removed_table_ids: Vec, + removed_table_ids: HashSet, inserted_table_infos: Vec, vnode_partition_count: u32, ) -> Self { @@ -902,6 +908,7 @@ impl IntraLevelDelta { #[derive(Debug, PartialEq, Clone)] pub enum GroupDeltaCommon { + NewL0SubLevel(Vec), IntraLevel(IntraLevelDeltaCommon), GroupConstruct(PbGroupConstruct), GroupDestroy(PbGroupDestroy), @@ -928,6 +935,13 @@ where Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(pb_group_merge) } + Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel( + pb_new_sub_level + .inserted_table_infos + .into_iter() + .map(T::from) + .collect(), + ), None => panic!("delta_type is not set"), } } @@ -951,6 +965,14 @@ where GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)), }, + GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta { + delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel { + inserted_table_infos: new_sub_level + .into_iter() + .map(PbSstableInfo::from) + .collect(), + })), + }, } } } @@ -973,6 +995,11 @@ where GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)), }, + GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta { + delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel { + inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(), + })), + }, } } } @@ -995,6 +1022,13 @@ where Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(*pb_group_merge) } + Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel( + pb_new_sub_level + .inserted_table_infos + .iter() + .map(T::from) + .collect(), + ), None => panic!("delta_type is not set"), } } From e98591096de7a454040b9e2391a6893c40f106ac Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 14 Oct 2024 17:13:01 +0800 Subject: [PATCH 2/7] fix --- .../compaction_group/hummock_version_ext.rs | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index b3cdcbe2aead8..0fd6c44dd94b0 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -514,6 +514,7 @@ impl HummockVersion { // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { + let mut is_l0_changed = false; for group_delta in &group_deltas.group_deltas { match group_delta { GroupDeltaCommon::GroupConstruct(group_construct) => { @@ -607,6 +608,7 @@ impl HummockVersion { inserted_table_infos.clone(), None, ); + is_l0_changed = true; } } } else { @@ -616,6 +618,9 @@ impl HummockVersion { self.state_table_info .compaction_group_member_table_ids(*compaction_group_id), ); + if level_delta.level_idx == 0 { + is_l0_changed = true; + } } } GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { @@ -625,26 +630,32 @@ impl HummockVersion { }); assert!(is_commit_epoch); - let next_l0_sub_level_id = levels - .l0 - .sub_levels - .last() - .map(|level| level.sub_level_id + 1) - .unwrap_or(1); - - insert_new_sub_level( - &mut levels.l0, - next_l0_sub_level_id, - PbLevelType::Overlapping, - inserted_table_infos.clone(), - None, - ); + if !inserted_table_infos.is_empty() { + let next_l0_sub_level_id = levels + .l0 + .sub_levels + .last() + .map(|level| level.sub_level_id + 1) + .unwrap_or(1); + + insert_new_sub_level( + &mut levels.l0, + next_l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + is_l0_changed = true; + } } GroupDeltaCommon::GroupDestroy(_) => { self.levels.remove(compaction_group_id); } } } + if is_l0_changed && let Some(levels) = self.levels.get_mut(compaction_group_id) { + levels.update_l0(); + } } self.id = version_delta.id; #[expect(deprecated)] @@ -950,9 +961,8 @@ impl HummockVersionCommon { } } -#[easy_ext::ext(HummockLevelsExt)] impl Levels { - pub fn apply_compact_ssts( + pub(crate) fn apply_compact_ssts( &mut self, level_delta: &IntraLevelDeltaCommon, member_table_ids: &BTreeSet, @@ -1038,7 +1048,10 @@ impl Levels { level_insert_ssts(&mut self.levels[idx], insert_table_infos); } } - if *level_idx == 0 && !delete_sst_ids_set.is_empty() { + } + + pub(crate) fn update_l0(&mut self) { + { self.l0 .sub_levels .retain(|level| !level.table_infos.is_empty()); @@ -1057,7 +1070,7 @@ impl Levels { } } - pub fn check_deleted_sst_exist( + pub(crate) fn check_deleted_sst_exist( &self, delete_sst_levels: &[u32], mut delete_sst_ids_set: HashSet, From 957286a4aff4a835baa8b708da6d99d556e6c270 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 14 Oct 2024 17:16:51 +0800 Subject: [PATCH 3/7] fix pub --- src/meta/src/hummock/manager/compaction/mod.rs | 1 - .../hummock_sdk/src/compaction_group/hummock_version_ext.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index d8cbcad27c095..10638c11e594b 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -41,7 +41,6 @@ use rand::seq::SliceRandom; use rand::thread_rng; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask}; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::level::{InputLevel, Level, Levels}; use risingwave_hummock_sdk::sstable_info::SstableInfo; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0fd6c44dd94b0..f5881f2e78e53 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1070,7 +1070,7 @@ impl Levels { } } - pub(crate) fn check_deleted_sst_exist( + pub fn check_deleted_sst_exist( &self, delete_sst_levels: &[u32], mut delete_sst_ids_set: HashSet, From 3d7e9be992950154b6c0666f01afd4c20dc2af5a Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 14 Oct 2024 17:40:20 +0800 Subject: [PATCH 4/7] fix --- src/meta/src/hummock/manager/checkpoint.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 8c6a57c7347ce..ea747dbf402e5 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -161,16 +161,19 @@ impl HummockManager { .iter() .flat_map(|delta| { match delta { - GroupDeltaCommon::IntraLevel(level_delta) => Some(level_delta), - _ => None, + GroupDeltaCommon::IntraLevel(level_delta) => { + Some(level_delta.inserted_table_infos.iter()) + } + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + Some(inserted_table_infos.iter()) + } + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => None, } .into_iter() - .flat_map(|level_delta| { - level_delta - .inserted_table_infos - .iter() - .map(|t| (t.object_id, t.file_size)) - }) + .flatten() + .map(|t| (t.object_id, t.file_size)) }) .chain( version_delta From 385acc437bb844d46b3ef73f4b053df43e7f6e84 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 14 Oct 2024 18:10:08 +0800 Subject: [PATCH 5/7] fix --- src/meta/src/hummock/manager/time_travel.rs | 2 +- src/storage/hummock_sdk/src/version.rs | 74 +++++++-------------- 2 files changed, 24 insertions(+), 52 deletions(-) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 09cde1632cc94..6e7648a4d4de2 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -473,7 +473,7 @@ impl HummockManager { } let written = write_sstable_infos( delta - .newly_added_sst_infos(&select_groups) + .newly_added_sst_infos(Some(&select_groups)) .filter(|s| !skip_sst_ids.contains(&s.sst_id)), txn, ) diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index fd796c76783de..b1807ac038a87 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -512,76 +512,48 @@ impl HummockVersionDelta { /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`, /// but it is possible that the object is moved or split from other compaction groups or levels. pub fn newly_added_object_ids(&self) -> HashSet { - self.group_deltas - .values() - .flat_map(|group_deltas| { - group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC - }; - sst_slice.iter().map(|sst| sst.object_id) - }) - }) - .chain(self.change_log_delta.values().flat_map(|delta| { - let new_log = delta.new_log.as_ref().unwrap(); - new_log - .new_value - .iter() - .map(|sst| sst.object_id) - .chain(new_log.old_value.iter().map(|sst| sst.object_id)) - })) + self.newly_added_sst_infos(None) + .map(|sst| sst.object_id) .collect() } pub fn newly_added_sst_ids(&self) -> HashSet { - let ssts_from_group_deltas = self.group_deltas.values().flat_map(|group_deltas| { - group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC - }; - sst_slice.iter() + self.newly_added_sst_infos(None) + .map(|sst| { + // TODO: should we instead use sst.sst_id? + sst.object_id }) - }); - - let ssts_from_change_log = self.change_log_delta.values().flat_map(|delta| { - let new_log = delta.new_log.as_ref().unwrap(); - new_log.new_value.iter().chain(new_log.old_value.iter()) - }); - - ssts_from_group_deltas - .chain(ssts_from_change_log) - .map(|sst| sst.object_id) .collect() } pub fn newly_added_sst_infos<'a>( &'a self, - select_group: &'a HashSet, + select_group: Option<&'a HashSet>, ) -> impl Iterator + 'a { self.group_deltas .iter() - .filter_map(|(cg_id, group_deltas)| { - if select_group.contains(cg_id) { - Some(group_deltas) - } else { + .filter_map(move |(cg_id, group_deltas)| { + if let Some(select_group) = select_group + && !select_group.contains(cg_id) + { None + } else { + Some(group_deltas) } }) .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC + let sst_slice = match &group_delta { + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) + | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon { + inserted_table_infos, + .. + }) => Some(inserted_table_infos.iter()), + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => None, }; - sst_slice.iter() + sst_slice.into_iter().flatten() }) }) .chain(self.change_log_delta.values().flat_map(|delta| { From b132d9d2ff4059271ff2205dc2d6f952f8ea1797 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 15 Oct 2024 12:15:53 +0800 Subject: [PATCH 6/7] cover more case --- .../src/cmd_impl/hummock/validate_version.rs | 24 ++++++--- .../compaction_group/hummock_version_ext.rs | 51 ++++++++++++------- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs index b6ab7f111aaac..62e988f42f1cf 100644 --- a/src/ctl/src/cmd_impl/hummock/validate_version.rs +++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs @@ -206,14 +206,22 @@ pub async fn print_version_delta_in_archive( } fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool { - let DeltaType::IntraLevel(delta) = delta else { - return false; - }; - delta - .inserted_table_infos - .iter() - .any(|sst| sst.sst_id == sst_id) - || delta.removed_table_ids.iter().any(|sst| *sst == sst_id) + match delta { + DeltaType::GroupConstruct(_) | DeltaType::GroupDestroy(_) | DeltaType::GroupMerge(_) => { + false + } + DeltaType::IntraLevel(delta) => { + delta + .inserted_table_infos + .iter() + .any(|sst| sst.sst_id == sst_id) + || delta.removed_table_ids.iter().any(|sst| *sst == sst_id) + } + DeltaType::NewL0SubLevel(delta) => delta + .inserted_table_infos + .iter() + .any(|sst| sst.sst_id == sst_id), + } } fn print_delta(delta: &DeltaType) { diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index f5881f2e78e53..78e129b40564b 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -420,11 +420,12 @@ impl HummockVersion { let mut removed_ssts: BTreeMap> = BTreeMap::new(); // Build only if all deltas are intra level deltas. - if !group_deltas - .group_deltas - .iter() - .all(|delta| matches!(delta, GroupDelta::IntraLevel(_))) - { + if !group_deltas.group_deltas.iter().all(|delta| { + matches!( + delta, + GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_) + ) + }) { continue; } @@ -432,24 +433,36 @@ impl HummockVersion { // current `hummock::manager::gen_version_delta` implementation. Better refactor the // struct to reduce conventions. for group_delta in &group_deltas.group_deltas { - if let GroupDelta::IntraLevel(intra_level) = group_delta { - if !intra_level.inserted_table_infos.is_empty() { - info.insert_sst_level = intra_level.level_idx; - info.insert_sst_infos - .extend(intra_level.inserted_table_infos.iter().cloned()); + match group_delta { + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + if !inserted_table_infos.is_empty() { + info.insert_sst_level = 0; + info.insert_sst_infos + .extend(inserted_table_infos.iter().cloned()); + } } - if !intra_level.removed_table_ids.is_empty() { - for id in &intra_level.removed_table_ids { - if intra_level.level_idx == 0 { - removed_l0_ssts.insert(*id); - } else { - removed_ssts - .entry(intra_level.level_idx) - .or_default() - .insert(*id); + GroupDeltaCommon::IntraLevel(intra_level) => { + if !intra_level.inserted_table_infos.is_empty() { + info.insert_sst_level = intra_level.level_idx; + info.insert_sst_infos + .extend(intra_level.inserted_table_infos.iter().cloned()); + } + if !intra_level.removed_table_ids.is_empty() { + for id in &intra_level.removed_table_ids { + if intra_level.level_idx == 0 { + removed_l0_ssts.insert(*id); + } else { + removed_ssts + .entry(intra_level.level_idx) + .or_default() + .insert(*id); + } } } } + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => {} } } From 2826323555dd79255e89044e7a8ee68aa873862b Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 18 Oct 2024 13:43:37 +0800 Subject: [PATCH 7/7] rename --- .../src/compaction_group/hummock_version_ext.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index ea7146a43c9f0..0189e136099b9 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -527,7 +527,7 @@ impl HummockVersion { // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { - let mut is_l0_changed = false; + let mut is_applied_l0_compact = false; for group_delta in &group_deltas.group_deltas { match group_delta { GroupDeltaCommon::GroupConstruct(group_construct) => { @@ -621,7 +621,6 @@ impl HummockVersion { inserted_table_infos.clone(), None, ); - is_l0_changed = true; } } } else { @@ -632,7 +631,7 @@ impl HummockVersion { .compaction_group_member_table_ids(*compaction_group_id), ); if level_delta.level_idx == 0 { - is_l0_changed = true; + is_applied_l0_compact = true; } } } @@ -658,7 +657,6 @@ impl HummockVersion { inserted_table_infos.clone(), None, ); - is_l0_changed = true; } } GroupDeltaCommon::GroupDestroy(_) => { @@ -666,8 +664,9 @@ impl HummockVersion { } } } - if is_l0_changed && let Some(levels) = self.levels.get_mut(compaction_group_id) { - levels.update_l0(); + if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id) + { + levels.post_apply_l0_compact(); } } self.id = version_delta.id; @@ -1063,7 +1062,7 @@ impl Levels { } } - pub(crate) fn update_l0(&mut self) { + pub(crate) fn post_apply_l0_compact(&mut self) { { self.l0 .sub_levels