diff --git a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs index d67c0ee169085..c9ce2f2c65a77 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_hummock_sdk::HummockEpoch; +use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId}; use crate::CtlContext; pub async fn list_version_deltas( context: &CtlContext, - start_id: u64, + start_id: HummockVersionId, num_epochs: u32, ) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs index 4d857ad8b91f1..a6e6f477f0064 100644 --- a/src/ctl/src/cmd_impl/hummock/validate_version.rs +++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs @@ -63,7 +63,7 @@ async fn get_archive( pub async fn print_user_key_in_archive( context: &CtlContext, - archive_ids: Vec, + archive_ids: impl IntoIterator, data_dir: String, user_key: String, use_new_object_prefix_strategy: bool, @@ -169,7 +169,7 @@ async fn print_user_key_in_sst( pub async fn print_version_delta_in_archive( context: &CtlContext, - archive_ids: Vec, + archive_ids: impl IntoIterator, data_dir: String, sst_id: HummockSstableObjectId, use_new_object_prefix_strategy: bool, diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index ed82f57af2af1..fb786a505d16c 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -739,8 +739,8 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an version_delta .into_iter() .map(|vd| hummock_version_delta::ActiveModel { - id: Set(vd.id as _), - prev_id: Set(vd.prev_id as _), + id: Set(vd.id.to_u64() as _), + prev_id: Set(vd.prev_id.to_u64() as _), max_committed_epoch: Set(vd.max_committed_epoch as _), safe_epoch: Set(vd.visible_table_safe_epoch() as _), trivial_move: Set(vd.trivial_move), diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 9a82f444079a0..5cc0765e16c81 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -21,7 +21,7 @@ use cmd_impl::bench::BenchCommands; use cmd_impl::hummock::SstDumpArgs; use itertools::Itertools; use risingwave_common::util::tokio_util::sync::CancellationToken; -use risingwave_hummock_sdk::HummockEpoch; +use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId}; use risingwave_meta::backup_restore::RestoreOpts; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; @@ -594,7 +594,12 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { start_id, num_epochs, }) => { - cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?; + cmd_impl::hummock::list_version_deltas( + context, + HummockVersionId::new(start_id), + num_epochs, + ) + .await?; } Commands::Hummock(HummockCommands::ListKv { epoch, @@ -737,7 +742,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { }) => { cmd_impl::hummock::print_version_delta_in_archive( context, - archive_ids, + archive_ids.into_iter().map(HummockVersionId::new), data_dir, sst_id, use_new_object_prefix_strategy, @@ -752,7 +757,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { }) => { cmd_impl::hummock::print_user_key_in_archive( context, - archive_ids, + archive_ids.into_iter().map(HummockVersionId::new), data_dir, user_key, use_new_object_prefix_strategy, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs index d4281434c962f..0c45c2f37ad5e 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs @@ -100,7 +100,7 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec Result Result> { // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks. - self.0.list_version_deltas(0, u32::MAX, u64::MAX).await + self.0 + .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX) + .await } async fn list_branched_objects(&self) -> Result> { diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 579c6ffda7de0..f2a933bb57295 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -21,6 +21,7 @@ use itertools::Itertools; use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::version::HummockVersionDelta; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_meta::manager::MetadataManager; use risingwave_pb::hummock::get_compaction_score_response::PickerInfo; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService; @@ -75,7 +76,10 @@ impl HummockManagerService for HummockServiceImpl { ) -> Result, Status> { let req = request.into_inner(); self.hummock_manager - .unpin_version_before(req.context_id, req.unpin_version_before) + .unpin_version_before( + req.context_id, + HummockVersionId::new(req.unpin_version_before), + ) .await?; Ok(Response::new(UnpinVersionBeforeResponse { status: None })) } @@ -114,7 +118,10 @@ impl HummockManagerService for HummockServiceImpl { ) -> Result, Status> { let req = request.into_inner(); self.hummock_manager - .trigger_compaction_deterministic(req.version_id, req.compaction_groups) + .trigger_compaction_deterministic( + HummockVersionId::new(req.version_id), + req.compaction_groups, + ) .await?; Ok(Response::new(TriggerCompactionDeterministicResponse {})) } @@ -136,7 +143,11 @@ impl HummockManagerService for HummockServiceImpl { let req = request.into_inner(); let version_deltas = self .hummock_manager - .list_version_deltas(req.start_id, req.num_limit, req.committed_epoch_limit) + .list_version_deltas( + HummockVersionId::new(req.start_id), + req.num_limit, + req.committed_epoch_limit, + ) .await?; let resp = ListVersionDeltasResponse { version_deltas: Some(PbHummockVersionDeltas { diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs index 6696bd534b78a..ced74ced833fb 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs @@ -174,6 +174,7 @@ mod tests { use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1; use risingwave_common::system_param::system_params_for_test; use risingwave_hummock_sdk::version::HummockVersion; + use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::HummockVersionStats; use crate::backup_restore::meta_snapshot_builder; @@ -190,13 +191,13 @@ mod tests { let mut builder = MetaSnapshotBuilder::new(meta_store.clone()); let mut hummock_version = HummockVersion::default(); - hummock_version.id = 1; + hummock_version.id = HummockVersionId::new(1); let get_ckpt_builder = |v: &HummockVersion| { let v_ = v.clone(); async move { v_ } }; let hummock_version_stats = HummockVersionStats { - hummock_version_id: hummock_version.id, + hummock_version_id: hummock_version.id.to_u64(), ..Default::default() }; hummock_version_stats.insert(&meta_store).await.unwrap(); @@ -250,7 +251,7 @@ mod tests { snapshot.metadata.default_cf.values().cloned().collect_vec(), vec![vec![100]] ); - assert_eq!(snapshot.metadata.hummock_version.id, 1); + assert_eq!(snapshot.metadata.hummock_version.id.to_u64(), 1); assert_eq!(snapshot.metadata.version_stats.hummock_version_id, 1); } } diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index bc22c74c39442..f0cdc514780e2 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -227,6 +227,7 @@ mod tests { use risingwave_backup::storage::MetaSnapshotStorage; use risingwave_common::config::{MetaBackend, SystemConfig}; use risingwave_hummock_sdk::version::HummockVersion; + use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::SystemParams; @@ -289,7 +290,7 @@ mod tests { metadata: ClusterMetadata { hummock_version: { let mut version = HummockVersion::default(); - version.id = 123; + version.id = HummockVersionId::new(123); version }, system_param: system_param.clone(), @@ -472,7 +473,7 @@ mod tests { ]), hummock_version: { let mut version = HummockVersion::default(); - version.id = 123; + version.id = HummockVersionId::new(123); version }, system_param: system_param.clone(), diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index bf63d3eed9690..bc3701a6b9d82 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -53,7 +53,7 @@ impl HummockVersionCheckpoint { stale_objects: checkpoint .stale_objects .iter() - .map(|(version_id, objects)| (*version_id as HummockVersionId, objects.clone())) + .map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone())) .collect(), } } @@ -61,7 +61,11 @@ impl HummockVersionCheckpoint { pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint { PbHummockVersionCheckpoint { version: Some(PbHummockVersion::from(&self.version)), - stale_objects: self.stale_objects.clone(), + stale_objects: self + .stale_objects + .iter() + .map(|(version_id, objects)| (version_id.to_u64(), objects.clone())) + .collect(), } } } @@ -245,7 +249,7 @@ impl HummockManager { timer.observe_duration(); self.metrics .checkpoint_version_id - .set(new_checkpoint_id as i64); + .set(new_checkpoint_id.to_u64() as i64); Ok(new_checkpoint_id - old_checkpoint_id) } diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 6bfcd119b89d7..b2e0b30771f98 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -283,15 +283,15 @@ impl HummockManager { context_id, HummockPinnedVersion { context_id, - min_pinned_id: INVALID_VERSION_ID, + min_pinned_id: INVALID_VERSION_ID.to_u64(), }, ); let version_id = versioning.current_version.id; let ret = versioning.current_version.clone(); - if context_pinned_version.min_pinned_id == INVALID_VERSION_ID - || context_pinned_version.min_pinned_id > version_id + if HummockVersionId::new(context_pinned_version.min_pinned_id) == INVALID_VERSION_ID + || HummockVersionId::new(context_pinned_version.min_pinned_id) > version_id { - context_pinned_version.min_pinned_id = version_id; + context_pinned_version.min_pinned_id = version_id.to_u64(); commit_multi_var!(self.meta_store_ref(), context_pinned_version)?; trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions); } @@ -327,12 +327,12 @@ impl HummockManager { }, ); assert!( - context_pinned_version.min_pinned_id <= unpin_before, + context_pinned_version.min_pinned_id <= unpin_before.to_u64(), "val must be monotonically non-decreasing. old = {}, new = {}.", context_pinned_version.min_pinned_id, unpin_before ); - context_pinned_version.min_pinned_id = unpin_before; + context_pinned_version.min_pinned_id = unpin_before.to_u64(); commit_multi_var!(self.meta_store_ref(), context_pinned_version)?; trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions); @@ -492,10 +492,10 @@ impl HummockManager { fn trigger_safepoint_stat(metrics: &MetaMetrics, safepoints: &[HummockVersionId]) { if let Some(sp) = safepoints.iter().min() { - metrics.min_safepoint_version_id.set(*sp as _); + metrics.min_safepoint_version_id.set(sp.to_u64() as _); } else { metrics .min_safepoint_version_id - .set(HummockVersionId::MAX as _); + .set(HummockVersionId::MAX.to_u64() as _); } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 3d7fd1ee58ee8..3ab2e02f026df 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -380,7 +380,7 @@ impl HummockManager { .into_iter() .map(|m| { ( - m.id as HummockVersionId, + HummockVersionId::new(m.id as _), HummockVersionDelta::from_persisted_protobuf( &PbHummockVersionDelta::from(m), ), diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index a0780641a3f30..a35dc5c4b077f 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -538,7 +538,7 @@ async fn test_hummock_manager_basic() { ); for _ in 0..2 { hummock_manager - .unpin_version_before(context_id_1, u64::MAX) + .unpin_version_before(context_id_1, HummockVersionId::MAX) .await .unwrap(); assert_eq!( @@ -571,8 +571,8 @@ async fn test_hummock_manager_basic() { ); // pinned by context_id_1 assert_eq!( - hummock_manager.get_min_pinned_version_id().await, - init_version_id + commit_log_count + register_log_count - 2, + hummock_manager.get_min_pinned_version_id().await + 2, + init_version_id + commit_log_count + register_log_count, ); } // objects_to_delete is always empty because no compaction is ever invoked. @@ -597,7 +597,7 @@ async fn test_hummock_manager_basic() { ((commit_log_count + register_log_count) as usize, 0) ); hummock_manager - .unpin_version_before(context_id_1, u64::MAX) + .unpin_version_before(context_id_1, HummockVersionId::MAX) .await .unwrap(); assert_eq!( @@ -605,7 +605,7 @@ async fn test_hummock_manager_basic() { init_version_id + commit_log_count + register_log_count ); hummock_manager - .unpin_version_before(context_id_2, u64::MAX) + .unpin_version_before(context_id_2, HummockVersionId::MAX) .await .unwrap(); assert_eq!( diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index e946969324b1a..70035e70aa435 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -142,7 +142,8 @@ impl HummockManager { .select_only() .column(hummock_time_travel_version::Column::VersionId) .filter( - hummock_time_travel_version::Column::VersionId.lt(earliest_valid_version_id), + hummock_time_travel_version::Column::VersionId + .lt(earliest_valid_version_id.to_u64()), ) .order_by_desc(hummock_time_travel_version::Column::VersionId) .into_tuple() @@ -152,7 +153,10 @@ impl HummockManager { hummock_time_travel_delta::Entity::find() .select_only() .column(hummock_time_travel_delta::Column::VersionId) - .filter(hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id)) + .filter( + hummock_time_travel_delta::Column::VersionId + .lt(earliest_valid_version_id.to_u64()), + ) .into_tuple() .all(&txn) .await?; @@ -212,23 +216,28 @@ impl HummockManager { } let res = hummock_time_travel_version::Entity::delete_many() - .filter(hummock_time_travel_version::Column::VersionId.lt(earliest_valid_version_id)) + .filter( + hummock_time_travel_version::Column::VersionId + .lt(earliest_valid_version_id.to_u64()), + ) .exec(&txn) .await?; tracing::debug!( - epoch_watermark_version_id = version_watermark.version_id, - earliest_valid_version_id, + epoch_watermark_version_id = ?version_watermark.version_id, + ?earliest_valid_version_id, "delete {} rows from hummock_time_travel_version", res.rows_affected ); let res = hummock_time_travel_delta::Entity::delete_many() - .filter(hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id)) + .filter( + hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id.to_u64()), + ) .exec(&txn) .await?; tracing::debug!( - epoch_watermark_version_id = version_watermark.version_id, - earliest_valid_version_id, + epoch_watermark_version_id = ?version_watermark.version_id, + ?earliest_valid_version_id, "delete {} rows from hummock_time_travel_delta", res.rows_affected ); @@ -380,7 +389,7 @@ impl HummockManager { } let epoch = delta.max_committed_epoch; - let version_id = delta.id; + let version_id: u64 = delta.id.to_u64(); let m = hummock_epoch_to_version::ActiveModel { epoch: Set(epoch.try_into().unwrap()), version_id: Set(version_id.try_into().unwrap()), @@ -415,9 +424,10 @@ impl HummockManager { ) .await?; let m = hummock_time_travel_version::ActiveModel { - version_id: Set( - risingwave_meta_model_v2::HummockVersionId::try_from(version.id).unwrap(), - ), + version_id: Set(risingwave_meta_model_v2::HummockVersionId::try_from( + version.id.to_u64(), + ) + .unwrap()), version: Set((&IncompleteHummockVersion::from((version, &select_groups)) .to_protobuf()) .into()), @@ -442,9 +452,10 @@ impl HummockManager { // Ignore delta which adds no data. if written > 0 { let m = hummock_time_travel_delta::ActiveModel { - version_id: Set( - risingwave_meta_model_v2::HummockVersionId::try_from(delta.id).unwrap(), - ), + version_id: Set(risingwave_meta_model_v2::HummockVersionId::try_from( + delta.id.to_u64(), + ) + .unwrap()), version_delta: Set((&IncompleteHummockVersionDelta::from(( &delta, &select_groups, @@ -483,7 +494,7 @@ fn replay_archive( // Need to work around the assertion in `apply_version_delta`. // Because compaction deltas are not included in time travel archive. while last_version.id < d.prev_id { - last_version.id += 1; + last_version.id = last_version.id + 1; } last_version.apply_version_delta(&d); } diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 9833dbea6e3af..f14aa070d586e 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -50,7 +50,9 @@ fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) metrics .safe_epoch .set(current_version.visible_table_safe_epoch() as i64); - metrics.current_version_id.set(current_version.id as i64); + metrics + .current_version_id + .set(current_version.id.to_u64() as i64); } pub(super) struct HummockVersionTransaction<'a> { diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 1bea45720c1b2..698996c701ac8 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -76,7 +76,7 @@ impl ContextInfo { for id in self .pinned_versions .values() - .map(|v| v.min_pinned_id) + .map(|v| HummockVersionId::new(v.min_pinned_id)) .chain(self.version_safe_points.iter().cloned()) { min_pinned_version_id = cmp::min(id, min_pinned_version_id); @@ -172,7 +172,7 @@ impl HummockManager { #[cfg_attr(coverage, coverage(off))] pub async fn list_version_deltas( &self, - start_id: u64, + start_id: HummockVersionId, num_limit: u32, committed_epoch_limit: HummockEpoch, ) -> Result> { @@ -363,7 +363,7 @@ pub(super) fn calc_new_write_limits( /// Note that the result is approximate value. See `estimate_table_stats`. fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats { let mut stats = HummockVersionStats { - hummock_version_id: version.id, + hummock_version_id: version.id.to_u64(), table_stats: Default::default(), }; for level in version.get_combined_levels() { @@ -431,11 +431,13 @@ mod tests { min_pinned_id: 10, }, ); - assert_eq!(context_info.min_pinned_version_id(), 10); - context_info.version_safe_points.push(5); - assert_eq!(context_info.min_pinned_version_id(), 5); + assert_eq!(context_info.min_pinned_version_id().to_u64(), 10); + context_info + .version_safe_points + .push(HummockVersionId::new(5)); + assert_eq!(context_info.min_pinned_version_id().to_u64(), 5); context_info.version_safe_points.clear(); - assert_eq!(context_info.min_pinned_version_id(), 10); + assert_eq!(context_info.min_pinned_version_id().to_u64(), 10); context_info.pinned_versions.clear(); assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX); } @@ -553,7 +555,7 @@ mod tests { } let mut version = HummockVersion::default(); - version.id = 123; + version.id = HummockVersionId::new(123); for cg in 1..3 { version.levels.insert( @@ -571,7 +573,7 @@ mod tests { hummock_version_id, table_stats, } = rebuild_table_stats(&version); - assert_eq!(hummock_version_id, version.id); + assert_eq!(hummock_version_id, version.id.to_u64()); assert_eq!(table_stats.len(), 3); for (tid, stats) in table_stats { assert_eq!( diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index 0b23a6970965c..b80fd0537659a 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -397,7 +397,7 @@ pub fn trigger_pin_unpin_version_state( } else { metrics .min_pinned_version_id - .set(HummockVersionId::MAX as _); + .set(HummockVersionId::MAX.to_u64() as _); } } diff --git a/src/meta/src/hummock/model/ext/hummock.rs b/src/meta/src/hummock/model/ext/hummock.rs index c82df1013737e..37dae37218fad 100644 --- a/src/meta/src/hummock/model/ext/hummock.rs +++ b/src/meta/src/hummock/model/ext/hummock.rs @@ -221,8 +221,8 @@ impl Transactional for HummockVersionStats { impl Transactional for HummockVersionDelta { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_version_delta::ActiveModel { - id: Set(self.id.try_into().unwrap()), - prev_id: Set(self.prev_id.try_into().unwrap()), + id: Set(self.id.to_u64().try_into().unwrap()), + prev_id: Set(self.prev_id.to_u64().try_into().unwrap()), max_committed_epoch: Set(self.max_committed_epoch.try_into().unwrap()), safe_epoch: Set(self.visible_table_safe_epoch().try_into().unwrap()), trivial_move: Set(self.trivial_move), @@ -246,9 +246,11 @@ impl Transactional for HummockVersionDelta { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_version_delta::Entity::delete_by_id(HummockVersionId::try_from(self.id).unwrap()) - .exec(trx) - .await?; + hummock_version_delta::Entity::delete_by_id( + HummockVersionId::try_from(self.id.to_u64()).unwrap(), + ) + .exec(trx) + .await?; Ok(()) } } diff --git a/src/meta/src/hummock/model/version_delta.rs b/src/meta/src/hummock/model/version_delta.rs index ed2be4761acaa..3c9182c419cb9 100644 --- a/src/meta/src/hummock/model/version_delta.rs +++ b/src/meta/src/hummock/model/version_delta.rs @@ -13,7 +13,6 @@ // limitations under the License. use risingwave_hummock_sdk::version::HummockVersionDelta; -use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::PbHummockVersionDelta; use crate::hummock::model::HUMMOCK_VERSION_DELTA_CF_NAME; @@ -21,7 +20,7 @@ use crate::model::{MetadataModel, MetadataModelResult}; /// `HummockVersionDelta` tracks delta of `Sstables` in given version based on previous version. impl MetadataModel for HummockVersionDelta { - type KeyType = HummockVersionId; + type KeyType = u64; type PbType = PbHummockVersionDelta; fn cf_name() -> String { @@ -37,6 +36,6 @@ impl MetadataModel for HummockVersionDelta { } fn key(&self) -> MetadataModelResult { - Ok(self.id) + Ok(self.id.to_u64()) } } diff --git a/src/meta/src/hummock/model/version_stats.rs b/src/meta/src/hummock/model/version_stats.rs index 512adca422bd5..0afb01555d09f 100644 --- a/src/meta/src/hummock/model/version_stats.rs +++ b/src/meta/src/hummock/model/version_stats.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::HummockVersionStats; use crate::hummock::model::HUMMOCK_VERSION_STATS_CF_NAME; @@ -21,7 +20,7 @@ use crate::model::{MetadataModel, MetadataModelResult}; /// `HummockVersionStats` stores stats for hummock version. /// Currently it only persists one row for latest version. impl MetadataModel for HummockVersionStats { - type KeyType = HummockVersionId; + type KeyType = u64; type PbType = HummockVersionStats; fn cf_name() -> String { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 951ed56f64fed..748e65d1f3af5 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1054,12 +1054,12 @@ impl MetaClient { pub async fn list_version_deltas( &self, - start_id: u64, + start_id: HummockVersionId, num_limit: u32, committed_epoch_limit: HummockEpoch, ) -> Result> { let req = ListVersionDeltasRequest { - start_id, + start_id: start_id.to_u64(), num_limit, committed_epoch_limit, }; @@ -1081,7 +1081,7 @@ impl MetaClient { compaction_groups: Vec, ) -> Result<()> { let req = TriggerCompactionDeterministicRequest { - version_id, + version_id: version_id.to_u64(), compaction_groups, }; self.inner.trigger_compaction_deterministic(req).await?; @@ -1409,7 +1409,7 @@ impl HummockMetaClient for MetaClient { async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> { let req = UnpinVersionBeforeRequest { context_id: self.worker_id(), - unpin_version_before, + unpin_version_before: unpin_version_before.to_u64(), }; self.inner.unpin_version_before(req).await?; Ok(()) diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 6423f9c4dd87b..91d3f6c77ca58 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -114,7 +114,7 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata { fn from(m: &MetaSnapshotMetadata) -> Self { Self { id: m.id, - hummock_version_id: m.hummock_version_id, + hummock_version_id: m.hummock_version_id.to_u64(), max_committed_epoch: m.max_committed_epoch, safe_epoch: m.safe_epoch, format_version: Some(m.format_version), diff --git a/src/storage/backup/src/meta_snapshot_v1.rs b/src/storage/backup/src/meta_snapshot_v1.rs index 9b4145ea25632..b873f863b6d05 100644 --- a/src/storage/backup/src/meta_snapshot_v1.rs +++ b/src/storage/backup/src/meta_snapshot_v1.rs @@ -256,6 +256,7 @@ impl ClusterMetadata { #[cfg(test)] mod tests { + use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::{CompactionGroup, TableStats}; use crate::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1}; @@ -265,7 +266,7 @@ mod tests { #[test] fn test_snapshot_encoding_decoding() { let mut metadata = ClusterMetadata::default(); - metadata.hummock_version.id = 321; + metadata.hummock_version.id = HummockVersionId::new(321); let raw = MetaSnapshot { format_version: 0, id: 123, @@ -281,7 +282,7 @@ mod tests { let mut buf = vec![]; let mut raw = ClusterMetadata::default(); raw.default_cf.insert(vec![0, 1, 2], vec![3, 4, 5]); - raw.hummock_version.id = 1; + raw.hummock_version.id = HummockVersionId::new(1); raw.version_stats.hummock_version_id = 10; raw.version_stats.table_stats.insert( 200, diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index 531652065f014..96fda84629656 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -28,7 +28,7 @@ use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo}; -use risingwave_hummock_sdk::HummockEpoch; +use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId}; use risingwave_pb::hummock::StateTableInfoDelta; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use spin::Mutex; @@ -117,7 +117,7 @@ fn gen_version( )); let mut version = HummockVersion::default(); let committed_epoch = test_epoch(new_epoch_idx as _); - version.id = new_epoch_idx as _; + version.id = HummockVersionId::new(new_epoch_idx as _); version.max_committed_epoch = committed_epoch; version.table_watermarks = (0..table_count) .map(|table_id| (TableId::new(table_id as _), table_watermarks.clone())) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 676472e969aab..3720afc12ff29 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1340,11 +1340,12 @@ mod tests { use crate::version::{ GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta, }; + use crate::HummockVersionId; #[test] fn test_get_sst_object_ids() { let mut version = HummockVersion::default(); - version.id = 0; + version.id = HummockVersionId::new(0); version.levels = HashMap::from_iter([( 0, Levels { @@ -1391,7 +1392,7 @@ mod tests { #[test] fn test_apply_version_delta() { let mut version = HummockVersion::default(); - version.id = 0; + version.id = HummockVersionId::new(0); version.levels = HashMap::from_iter([ ( 0, @@ -1415,7 +1416,7 @@ mod tests { ), ]); let mut version_delta = HummockVersionDelta::default(); - version_delta.id = 1; + version_delta.id = HummockVersionId::new(1); version_delta.group_deltas = HashMap::from_iter([ ( 2, @@ -1474,7 +1475,7 @@ mod tests { }; assert_eq!(version, { let mut version = HummockVersion::default(); - version.id = 1; + version.id = HummockVersionId::new(1); version.levels = HashMap::from_iter([ ( 2, diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index c894ecf184128..b9cbbc690d591 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -26,10 +26,13 @@ mod key_cmp; use std::cmp::Ordering; use std::collections::HashMap; +use std::fmt::{Display, Formatter}; +use std::ops::{Add, Sub}; pub use key_cmp::*; use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use sstable_info::SstableInfo; use crate::key_range::KeyRangeCommon; @@ -57,13 +60,72 @@ use crate::table_watermark::TableWatermarks; pub type HummockSstableObjectId = u64; pub type HummockSstableId = u64; pub type HummockRefCount = u64; -pub type HummockVersionId = u64; pub type HummockContextId = u32; pub type HummockEpoch = u64; pub type HummockCompactionTaskId = u64; pub type CompactionGroupId = u64; -pub const INVALID_VERSION_ID: HummockVersionId = 0; -pub const FIRST_VERSION_ID: HummockVersionId = 1; + +#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)] +pub struct HummockVersionId(u64); + +impl Display for HummockVersionId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Serialize for HummockVersionId { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_u64(self.0) + } +} + +impl<'de> Deserialize<'de> for HummockVersionId { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + Ok(Self(::deserialize(deserializer)?)) + } +} + +impl HummockVersionId { + pub const MAX: Self = Self(u64::MAX); + + pub const fn new(id: u64) -> Self { + Self(id) + } + + pub fn next(&self) -> Self { + Self(self.0 + 1) + } + + pub fn to_u64(self) -> u64 { + self.0 + } +} + +impl Add for HummockVersionId { + type Output = Self; + + fn add(self, rhs: u64) -> Self::Output { + Self(self.0 + rhs) + } +} + +impl Sub for HummockVersionId { + type Output = u64; + + fn sub(self, rhs: Self) -> Self::Output { + self.0 - rhs.0 + } +} + +pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0); +pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1); pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56; pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56; pub const OBJECT_SUFFIX: &str = "data"; diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index a870d7d6d549a..1756353de74af 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -27,14 +27,14 @@ use crate::version::{ GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo, IntraLevelDelta, }; -use crate::{CompactionGroupId, HummockSstableId}; +use crate::{CompactionGroupId, HummockSstableId, HummockVersionId}; /// [`IncompleteHummockVersion`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: /// - `PbLevels` /// - `TableChangeLog` #[derive(Debug, Clone, PartialEq)] pub struct IncompleteHummockVersion { - pub id: u64, + pub id: HummockVersionId, pub levels: HashMap, pub max_committed_epoch: u64, safe_epoch: u64, @@ -236,7 +236,7 @@ impl IncompleteHummockVersion { /// Resulted `SStableInfo` is incompelte. pub fn to_protobuf(&self) -> PbHummockVersion { PbHummockVersion { - id: self.id, + id: self.id.0, levels: self .levels .iter() @@ -264,8 +264,8 @@ impl IncompleteHummockVersion { /// - `ChangeLogDelta` #[derive(Debug, PartialEq, Clone)] pub struct IncompleteHummockVersionDelta { - pub id: u64, - pub prev_id: u64, + pub id: HummockVersionId, + pub prev_id: HummockVersionId, pub group_deltas: HashMap, pub max_committed_epoch: u64, pub safe_epoch: u64, @@ -314,8 +314,8 @@ impl IncompleteHummockVersionDelta { /// Resulted `SStableInfo` is incompelte. pub fn to_protobuf(&self) -> PbHummockVersionDelta { PbHummockVersionDelta { - id: self.id, - prev_id: self.prev_id, + id: self.id.0, + prev_id: self.prev_id.0, group_deltas: self.group_deltas.clone(), max_committed_epoch: self.max_committed_epoch, safe_epoch: self.safe_epoch, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index d38376a7fda0f..5e6ea2f3fe4a7 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -205,7 +205,7 @@ impl HummockVersionStateTableInfo { #[derive(Debug, Clone, PartialEq)] pub struct HummockVersion { - pub id: u64, + pub id: HummockVersionId, pub levels: HashMap, pub max_committed_epoch: u64, safe_epoch: u64, @@ -258,7 +258,7 @@ impl HummockVersion { impl From<&PbHummockVersion> for HummockVersion { fn from(pb_version: &PbHummockVersion) -> Self { Self { - id: pb_version.id, + id: HummockVersionId(pb_version.id), levels: pb_version .levels .iter() @@ -296,7 +296,7 @@ impl From<&PbHummockVersion> for HummockVersion { impl From<&HummockVersion> for PbHummockVersion { fn from(version: &HummockVersion) -> Self { Self { - id: version.id, + id: version.id.0, levels: version .levels .iter() @@ -322,7 +322,7 @@ impl From<&HummockVersion> for PbHummockVersion { impl From for PbHummockVersion { fn from(version: HummockVersion) -> Self { Self { - id: version.id, + id: version.id.0, levels: version .levels .into_iter() @@ -347,7 +347,7 @@ impl From for PbHummockVersion { impl HummockVersion { pub fn next_version_id(&self) -> HummockVersionId { - self.id + 1 + self.id.next() } pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool { @@ -436,8 +436,8 @@ impl HummockVersion { #[derive(Debug, PartialEq, Clone)] pub struct HummockVersionDelta { - pub id: u64, - pub prev_id: u64, + pub id: HummockVersionId, + pub prev_id: HummockVersionId, pub group_deltas: HashMap, pub max_committed_epoch: u64, safe_epoch: u64, @@ -575,8 +575,8 @@ impl HummockVersionDelta { impl From<&PbHummockVersionDelta> for HummockVersionDelta { fn from(pb_version_delta: &PbHummockVersionDelta) -> Self { Self { - id: pb_version_delta.id, - prev_id: pb_version_delta.prev_id, + id: HummockVersionId(pb_version_delta.id), + prev_id: HummockVersionId(pb_version_delta.prev_id), group_deltas: pb_version_delta .group_deltas .iter() @@ -625,8 +625,8 @@ impl From<&PbHummockVersionDelta> for HummockVersionDelta { impl From<&HummockVersionDelta> for PbHummockVersionDelta { fn from(version_delta: &HummockVersionDelta) -> Self { Self { - id: version_delta.id, - prev_id: version_delta.prev_id, + id: version_delta.id.0, + prev_id: version_delta.prev_id.0, group_deltas: version_delta .group_deltas .iter() @@ -662,8 +662,8 @@ impl From<&HummockVersionDelta> for PbHummockVersionDelta { impl From for PbHummockVersionDelta { fn from(version_delta: HummockVersionDelta) -> Self { Self { - id: version_delta.id, - prev_id: version_delta.prev_id, + id: version_delta.id.0, + prev_id: version_delta.prev_id.0, group_deltas: version_delta .group_deltas .into_iter() @@ -699,8 +699,8 @@ impl From for PbHummockVersionDelta { impl From for HummockVersionDelta { fn from(pb_version_delta: PbHummockVersionDelta) -> Self { Self { - id: pb_version_delta.id, - prev_id: pb_version_delta.prev_id, + id: HummockVersionId(pb_version_delta.id), + prev_id: HummockVersionId(pb_version_delta.prev_id), group_deltas: pb_version_delta .group_deltas .into_iter() diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index fe9e5874e328e..7d879392d2876 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -21,7 +21,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::util::addr::HostAddr; use risingwave_common_service::{Channel, NotificationClient, ObserverError}; use risingwave_hummock_sdk::key::TableKey; -use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult}; use risingwave_hummock_trace::{ GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore, ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions, @@ -162,7 +162,9 @@ impl ReplayStateStore for GlobalReplayImpl { // wait till version updated if let Some(prev_version_id) = prev_version_id { - self.store.wait_version_update(prev_version_id).await; + self.store + .wait_version_update(HummockVersionId::new(prev_version_id)) + .await; } Ok(version) } diff --git a/src/storage/src/hummock/event_handler/uploader/test_utils.rs b/src/storage/src/hummock/event_handler/uploader/test_utils.rs index f866ef18f9e22..07a306a2a8df8 100644 --- a/src/storage/src/hummock/event_handler/uploader/test_utils.rs +++ b/src/storage/src/hummock/event_handler/uploader/test_utils.rs @@ -34,7 +34,7 @@ use risingwave_hummock_sdk::key::{FullKey, TableKey}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId, LocalSstableInfo}; use risingwave_pb::hummock::StateTableInfoDelta; use spin::Mutex; use tokio::spawn; @@ -90,7 +90,7 @@ impl HummockUploader { pub(super) fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { let mut version = HummockVersion::default(); - version.id = epoch; + version.id = HummockVersionId::new(epoch); version.max_committed_epoch = epoch; version.state_table_info.apply_delta( &HashMap::from_iter([( diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index a3d7b5544ecfe..8fc6475d0ed2b 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -176,7 +176,7 @@ pub(crate) async fn start_pinned_version_worker( min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let mut need_unpin = false; - let mut version_ids_in_use: BTreeMap = BTreeMap::new(); + let mut version_ids_in_use: BTreeMap = BTreeMap::new(); let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec); // For each run in the loop, accumulate versions to unpin and call unpin RPC once. loop { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 23ada8660614b..9b534dee967f6 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -32,7 +32,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -780,7 +780,7 @@ impl HummockStorage { &self.hummock_version_reader } - pub async fn wait_version_update(&self, old_id: u64) -> u64 { + pub async fn wait_version_update(&self, old_id: HummockVersionId) -> HummockVersionId { use tokio::task::yield_now; loop { let cur_id = self.pinned_version.load().id(); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index de5e747624098..6e1ca43be811f 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -33,7 +33,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, FIRST_VERSION_ID}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId, FIRST_VERSION_ID}; use risingwave_pb::common::WorkerType; use risingwave_rpc_client::{HummockMetaClient, MetaClient}; use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient; @@ -294,7 +294,7 @@ async fn pull_version_deltas( let (handle, shutdown_tx) = MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000), vec![]); let res = meta_client - .list_version_deltas(0, u32::MAX, u64::MAX) + .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX) .await .unwrap(); @@ -643,7 +643,7 @@ async fn open_hummock_iters( } pub async fn check_compaction_results( - version_id: u64, + version_id: HummockVersionId, mut expect_results: BTreeMap, mut actual_results: BTreeMap, ) -> anyhow::Result<()> {