Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): commit epoch in separate group delta #18893

Merged
merged 8 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ message InputLevel {
repeated SstableInfo table_infos = 3;
}

message NewL0SubLevel {
repeated SstableInfo inserted_table_infos = 1;
}

message IntraLevelDelta {
uint32 level_idx = 1;
uint64 l0_sub_level_id = 2;
Expand Down Expand Up @@ -112,6 +116,7 @@ message GroupDelta {
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMerge group_merge = 6;
NewL0SubLevel new_l0_sub_level = 7;
}
}

Expand Down
24 changes: 16 additions & 8 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,22 @@ pub async fn print_version_delta_in_archive(
}

fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool {
let DeltaType::IntraLevel(delta) = delta else {
return false;
};
delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id)
|| delta.removed_table_ids.iter().any(|sst| *sst == sst_id)
match delta {
DeltaType::GroupConstruct(_) | DeltaType::GroupDestroy(_) | DeltaType::GroupMerge(_) => {
false
}
DeltaType::IntraLevel(delta) => {
delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id)
|| delta.removed_table_ids.iter().any(|sst| *sst == sst_id)
}
DeltaType::NewL0SubLevel(delta) => delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id),
}
}

fn print_delta(delta: &DeltaType) {
Expand Down
30 changes: 21 additions & 9 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ use std::ops::Bound::{Excluded, Included};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::Ordering;

use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
object_size_map, summarize_group_deltas,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -156,13 +154,27 @@ impl HummockManager {
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
{
for (group_id, group_deltas) in &version_delta.group_deltas {
let summary = summarize_group_deltas(group_deltas, *group_id);
for group_deltas in version_delta.group_deltas.values() {
object_sizes.extend(
summary
.insert_table_infos
group_deltas
.group_deltas
.iter()
.map(|t| (t.object_id, t.file_size))
.flat_map(|delta| {
match delta {
GroupDeltaCommon::IntraLevel(level_delta) => {
Some(level_delta.inserted_table_infos.iter())
}
GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
Some(inserted_table_infos.iter())
}
GroupDeltaCommon::GroupConstruct(_)
| GroupDeltaCommon::GroupDestroy(_)
| GroupDeltaCommon::GroupMerge(_) => None,
}
.into_iter()
.flatten()
.map(|t| (t.object_id, t.file_size))
})
.chain(
version_delta
.change_log_delta
Expand Down
9 changes: 3 additions & 6 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::level::{InputLevel, Level, Levels};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
Expand Down Expand Up @@ -153,17 +152,15 @@ impl<'a> HummockVersionTransaction<'a> {
.entry(compact_task.compaction_group_id)
.or_default()
.group_deltas;
let mut removed_table_ids_map: BTreeMap<u32, Vec<u64>> = BTreeMap::default();
let mut removed_table_ids_map: BTreeMap<u32, HashSet<u64>> = BTreeMap::default();

for level in &compact_task.input_ssts {
let level_idx = level.level_idx;
let mut removed_table_ids =
level.table_infos.iter().map(|sst| sst.sst_id).collect_vec();

removed_table_ids_map
.entry(level_idx)
.or_default()
.append(&mut removed_table_ids);
.extend(level.table_infos.iter().map(|sst| sst.sst_id));
}

for (level_idx, removed_table_ids) in removed_table_ids_map {
Expand All @@ -181,7 +178,7 @@ impl<'a> HummockVersionTransaction<'a> {
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
compact_task.target_level,
compact_task.target_sub_level_id,
vec![], // default
HashSet::new(), // default
compact_task.sorted_output_ssts.clone(),
compact_task.split_weight_by_vnode,
));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ impl HummockManager {
}
let written = write_sstable_infos(
delta
.newly_added_sst_infos(&select_groups)
.newly_added_sst_infos(Some(&select_groups))
.filter(|s| !skip_sst_ids.contains(&s.sst_id)),
txn,
)
Expand Down
12 changes: 2 additions & 10 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{
GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta,
};
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId,
};
Expand Down Expand Up @@ -154,13 +152,7 @@ impl<'a> HummockVersionTransaction<'a> {
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
0,
0, // l0_sub_level_id will be generated during apply_version_delta
vec![], // default
inserted_table_infos,
0, // default
));
let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);

group_deltas.push(group_delta);
}
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("hummock.GroupTableChange", "#[derive(Eq)]")
.type_attribute("hummock.GroupMerge", "#[derive(Eq)]")
.type_attribute("hummock.GroupDelta", "#[derive(Eq)]")
.type_attribute("hummock.NewL0SubLevel", "#[derive(Eq)]")
.type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]")
.type_attribute("hummock.LevelHandler", "#[derive(Eq)]")
.type_attribute("hummock.TableOption", "#[derive(Eq)]")
Expand Down
Loading
Loading