Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into siyuan/source-retry
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Dec 18, 2024
2 parents 91a6e1c + 3cbc231 commit 91bada1
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 158 deletions.
1 change: 1 addition & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ macro_rules! impl_system_params_for_test {
ret.backup_storage_url = Some("memory".into());
ret.backup_storage_directory = Some("backup".into());
ret.use_new_object_prefix_strategy = Some(false);
ret.time_travel_retention_ms = Some(0);
ret
}
};
Expand Down
35 changes: 19 additions & 16 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2799,11 +2799,6 @@ impl CatalogController {
inner.list_all_state_tables().await
}

pub async fn list_all_state_table_ids(&self) -> MetaResult<Vec<TableId>> {
let inner = self.inner.read().await;
inner.list_all_state_table_ids().await
}

pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
let inner = self.inner.read().await;
let table_ids: Vec<TableId> = Table::find()
Expand Down Expand Up @@ -3228,6 +3223,10 @@ impl CatalogController {
.collect();
Ok(res)
}

pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
self.inner.read().await.list_time_travel_table_ids().await
}
}

/// `CatalogStats` is a struct to store the statistics of all catalogs.
Expand Down Expand Up @@ -3360,17 +3359,6 @@ impl CatalogControllerInner {
.collect())
}

/// `list_all_tables` return all ids of state tables.
pub async fn list_all_state_table_ids(&self) -> MetaResult<Vec<TableId>> {
let table_ids: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.into_tuple()
.all(&self.db)
.await?;
Ok(table_ids)
}

/// `list_tables` return all `CREATED` tables, `CREATING` materialized views and internal tables that belong to them.
async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
let table_objs = Table::find()
Expand Down Expand Up @@ -3589,6 +3577,21 @@ impl CatalogControllerInner {
let _ = tx.send(Err(err.clone()));
}
}

pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
let table_ids: Vec<TableId> = Table::find()
.select_only()
.filter(table::Column::TableType.is_in(vec![
TableType::Table,
TableType::MaterializedView,
TableType::Index,
]))
.column(table::Column::TableId)
.into_tuple()
.all(&self.db)
.await?;
Ok(table_ids)
}
}

async fn update_internal_tables(
Expand Down
17 changes: 2 additions & 15 deletions src/meta/src/controller/system_param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use anyhow::anyhow;
use risingwave_common::system_param::common::CommonHandler;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::system_param::{
check_missing_params, default, derive_missing_fields, set_system_param,
check_missing_params, derive_missing_fields, set_system_param,
};
use risingwave_common::{for_all_params, key_of};
use risingwave_meta_model::prelude::SystemParameter;
Expand Down Expand Up @@ -132,18 +132,6 @@ for_all_params!(impl_system_params_from_db);
for_all_params!(impl_merge_params);
for_all_params!(impl_system_params_to_models);

fn apply_hard_code_override(params: &mut PbSystemParams) {
if params
.time_travel_retention_ms
.map(|v| v == 0)
.unwrap_or(true)
{
let default_v = default::time_travel_retention_ms();
tracing::info!("time_travel_retention_ms has been overridden to {default_v}");
params.time_travel_retention_ms = Some(default_v);
}
}

impl SystemParamsController {
pub async fn new(
sql_meta_store: SqlMetaStore,
Expand All @@ -152,8 +140,7 @@ impl SystemParamsController {
) -> MetaResult<Self> {
let db = sql_meta_store.conn;
let params = SystemParameter::find().all(&db).await?;
let mut params = merge_params(system_params_from_db(params)?, init_params);
apply_hard_code_override(&mut params);
let params = merge_params(system_params_from_db(params)?, init_params);
tracing::info!(initial_params = ?SystemParamsReader::new(&params), "initialize system parameters");
check_missing_params(&params).map_err(|e| anyhow!(e))?;
let ctl = Self {
Expand Down
17 changes: 10 additions & 7 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,6 @@ impl HummockManager {
.time_travel_snapshot_interval_counter
.saturating_add(1);
}
let group_parents = version
.latest_version()
.levels
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let time_travel_tables_to_commit =
table_compaction_group_mapping
.iter()
Expand All @@ -261,13 +255,22 @@ impl HummockManager {
.get(table_id)
.map(|committed_epoch| (table_id, cg_id, *committed_epoch))
});
let time_travel_table_ids: HashSet<_> = self
.metadata_manager
.catalog_controller
.list_time_travel_table_ids()
.await
.map_err(|e| Error::Internal(e.into()))?
.into_iter()
.map(|id| id.try_into().unwrap())
.collect();
let mut txn = self.env.meta_store_ref().conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
&txn,
time_travel_version,
time_travel_delta,
&group_parents,
time_travel_table_ids,
&versioning.last_time_travel_snapshot_sst_ids,
time_travel_tables_to_commit,
)
Expand Down
90 changes: 42 additions & 48 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::collections::{HashMap, HashSet, VecDeque};

use anyhow::anyhow;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::time_travel::{
refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta,
Expand Down Expand Up @@ -380,20 +381,19 @@ impl HummockManager {
txn: &DatabaseTransaction,
version: Option<&HummockVersion>,
delta: HummockVersionDelta,
group_parents: &HashMap<CompactionGroupId, CompactionGroupId>,
time_travel_table_ids: HashSet<StateTableId>,
skip_sst_ids: &HashSet<HummockSstableId>,
tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
) -> Result<Option<HashSet<HummockSstableId>>> {
let select_groups = group_parents
.iter()
.filter_map(|(cg_id, _)| {
if should_ignore_group(find_root_group(*cg_id, group_parents)) {
None
} else {
Some(*cg_id)
}
})
.collect::<HashSet<_>>();
if self
.env
.system_params_reader()
.await
.time_travel_retention_ms()
== 0
{
return Ok(None);
}
async fn write_sstable_infos(
mut sst_infos: impl Iterator<Item = &SstableInfo>,
txn: &DatabaseTransaction,
Expand Down Expand Up @@ -428,10 +428,7 @@ impl HummockManager {
Ok(count)
}

for (table_id, cg_id, committed_epoch) in tables_to_commit {
if !select_groups.contains(cg_id) {
continue;
}
for (table_id, _cg_id, committed_epoch) in tables_to_commit {
let version_id: u64 = delta.id.to_u64();
let m = hummock_epoch_to_version::ActiveModel {
epoch: Set(committed_epoch.try_into().unwrap()),
Expand All @@ -446,16 +443,28 @@ impl HummockManager {

let mut version_sst_ids = None;
if let Some(version) = version {
// `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
version_sst_ids = Some(
version
.get_sst_infos_from_groups(&select_groups)
.map(|s| s.sst_id)
.get_sst_infos()
.filter_map(|s| {
if s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
{
return Some(s.sst_id);
}
None
})
.collect(),
);
write_sstable_infos(
version
.get_sst_infos_from_groups(&select_groups)
.filter(|s| !skip_sst_ids.contains(&s.sst_id)),
version.get_sst_infos().filter(|s| {
!skip_sst_ids.contains(&s.sst_id)
&& s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
}),
txn,
self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
)
Expand All @@ -465,19 +474,24 @@ impl HummockManager {
version.id.to_u64(),
)
.unwrap()),
version: Set((&IncompleteHummockVersion::from((version, &select_groups))
.to_protobuf())
.into()),
version: Set(
(&IncompleteHummockVersion::from((version, &time_travel_table_ids))
.to_protobuf())
.into(),
),
};
hummock_time_travel_version::Entity::insert(m)
.on_conflict_do_nothing()
.exec(txn)
.await?;
}
let written = write_sstable_infos(
delta
.newly_added_sst_infos(Some(&select_groups))
.filter(|s| !skip_sst_ids.contains(&s.sst_id)),
delta.newly_added_sst_infos().filter(|s| {
!skip_sst_ids.contains(&s.sst_id)
&& s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
}),
txn,
self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
)
Expand All @@ -491,7 +505,7 @@ impl HummockManager {
.unwrap()),
version_delta: Set((&IncompleteHummockVersionDelta::from((
&delta,
&select_groups,
&time_travel_table_ids,
))
.to_protobuf())
.into()),
Expand Down Expand Up @@ -531,26 +545,6 @@ fn replay_archive(
last_version
}

fn find_root_group(
group_id: CompactionGroupId,
parents: &HashMap<CompactionGroupId, CompactionGroupId>,
) -> CompactionGroupId {
let mut root = group_id;
while let Some(parent) = parents.get(&root)
&& *parent != 0
{
root = *parent;
}
root
}

fn should_ignore_group(root_group_id: CompactionGroupId) -> bool {
// It is possible some intermediate groups has been dropped,
// so it's impossible to tell whether the root group is MaterializedView or not.
// Just treat them as MaterializedView for correctness.
root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId
}

pub fn require_sql_meta_store_err() -> Error {
Error::TimeTravel(anyhow!("require SQL meta store"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,35 +84,6 @@ impl HummockVersion {
.map(|s| s.sst_id)
}

/// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`.
/// i.e. `select_group` is just a hint.
/// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future.
pub fn get_sst_infos_from_groups<'a>(
&'a self,
select_group: &'a HashSet<CompactionGroupId>,
) -> impl Iterator<Item = &'a SstableInfo> + 'a {
self.levels
.iter()
.filter_map(|(cg_id, level)| {
if select_group.contains(cg_id) {
Some(level)
} else {
None
}
})
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
// TODO: optimization: strip table change log
change_log.0.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
}))
}

pub fn level_iter<F: FnMut(&Level) -> bool>(
&self,
compaction_group_id: CompactionGroupId,
Expand Down
Loading

0 comments on commit 91bada1

Please sign in to comment.