diff --git a/Cargo.lock b/Cargo.lock index d2cb5a29b3284..b5f04b2a910bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11516,6 +11516,7 @@ dependencies = [ name = "risingwave_hummock_sdk" version = "2.3.0-alpha" dependencies = [ + "bincode 2.0.0-rc.3", "bytes", "easy-ext", "hex", diff --git a/proto/hummock.proto b/proto/hummock.proto index 15f3d61a7cf2b..d4b83930ddb27 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -149,6 +149,9 @@ message TableWatermarks { // The direction of the table watermark. bool is_ascending = 2; + + // The table watermark is non-pk prefix table watermark. + bool is_non_pk_prefix = 3; } message EpochNewChangeLog { @@ -194,6 +197,7 @@ message HummockVersion { map table_watermarks = 5; map table_change_logs = 6; map state_table_info = 7; + // map non_pk_prefix_table_watermarks = 8; } message HummockVersionDelta { diff --git a/src/storage/hummock_sdk/Cargo.toml b/src/storage/hummock_sdk/Cargo.toml index 47042e659a35c..c6dff5afb32d9 100644 --- a/src/storage/hummock_sdk/Cargo.toml +++ b/src/storage/hummock_sdk/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack", "num-traits"] normal = ["workspace-hack"] [dependencies] +bincode = { version = "=2.0.0-rc.3", features = ["serde"] } bytes = "1" easy-ext = "1" hex = "0.4" diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 6575d69886968..a229ca0a540da 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -135,6 +135,7 @@ pub fn safe_epoch_table_watermarks_impl( Some(TableWatermarks { watermarks: vec![(*first_epoch, first_epoch_watermark.clone())], direction: table_watermarks.direction, + watermark_type: table_watermarks.watermark_type, }) } else { None diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 3448b4c7114a7..a30c7d3e28a0d 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -19,6 +19,7 @@ use std::mem::size_of; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; +use bincode::{Decode, Encode}; use bytes::Bytes; use itertools::Itertools; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; @@ -408,6 +409,7 @@ pub struct TableWatermarks { // later epoch at the back pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>, pub direction: WatermarkDirection, + pub watermark_type: WatermarkSerdeType, } impl TableWatermarks { @@ -415,12 +417,14 @@ impl TableWatermarks { epoch: HummockEpoch, watermarks: Vec, direction: WatermarkDirection, + watermark_type: WatermarkSerdeType, ) -> Self { let mut this = Self { direction, watermarks: Vec::new(), + watermark_type, }; - this.add_new_epoch_watermarks(epoch, watermarks.into(), direction); + this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type); this } @@ -429,8 +433,11 @@ impl TableWatermarks { epoch: HummockEpoch, watermarks: Arc<[VnodeWatermark]>, direction: WatermarkDirection, + watermark_type: WatermarkSerdeType, ) { assert_eq!(self.direction, direction); + assert_eq!(self.watermark_type, watermark_type); + if let Some((prev_epoch, _)) = self.watermarks.last() { assert!(*prev_epoch < epoch); } @@ -475,6 +482,11 @@ impl TableWatermarks { } else { WatermarkDirection::Descending }, + watermark_type: if pb.is_non_pk_prefix { + WatermarkSerdeType::NonPkPrefix + } else { + WatermarkSerdeType::PkPrefix + }, } } } @@ -482,19 +494,30 @@ impl TableWatermarks { pub fn merge_multiple_new_table_watermarks( table_watermarks_list: impl IntoIterator>, ) -> HashMap { - let mut ret: HashMap>)> = - HashMap::new(); + #[allow(clippy::type_complexity)] + let mut ret: HashMap< + TableId, + ( + WatermarkDirection, + BTreeMap>, + WatermarkSerdeType, + ), + > = HashMap::new(); for table_watermarks in table_watermarks_list { for (table_id, new_table_watermarks) in table_watermarks { let epoch_watermarks = match ret.entry(table_id) { Entry::Occupied(entry) => { - let (direction, epoch_watermarks) = entry.into_mut(); + let (direction, epoch_watermarks, watermark_type) = entry.into_mut(); assert_eq!(&new_table_watermarks.direction, direction); + assert_eq!(&new_table_watermarks.watermark_type, watermark_type); epoch_watermarks } Entry::Vacant(entry) => { - let (_, epoch_watermarks) = - entry.insert((new_table_watermarks.direction, BTreeMap::new())); + let (_, epoch_watermarks, _) = entry.insert(( + new_table_watermarks.direction, + BTreeMap::new(), + new_table_watermarks.watermark_type, + )); epoch_watermarks } }; @@ -507,19 +530,22 @@ pub fn merge_multiple_new_table_watermarks( } } ret.into_iter() - .map(|(table_id, (direction, epoch_watermarks))| { - ( - table_id, - TableWatermarks { - direction, - // ordered from earlier epoch to later epoch - watermarks: epoch_watermarks - .into_iter() - .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks))) - .collect(), - }, - ) - }) + .map( + |(table_id, (direction, epoch_watermarks, watermark_type))| { + ( + table_id, + TableWatermarks { + direction, + // ordered from earlier epoch to later epoch + watermarks: epoch_watermarks + .into_iter() + .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks))) + .collect(), + watermark_type, + }, + ) + }, + ) .collect() } @@ -625,6 +651,7 @@ impl TableWatermarks { *self = TableWatermarks { watermarks: result_epoch_watermark, direction: self.direction, + watermark_type: self.watermark_type, } } } @@ -671,6 +698,11 @@ impl From<&PbTableWatermarks> for TableWatermarks { } else { WatermarkDirection::Descending }, + watermark_type: if pb.is_non_pk_prefix { + WatermarkSerdeType::NonPkPrefix + } else { + WatermarkSerdeType::PkPrefix + }, } } } @@ -690,6 +722,10 @@ impl From<&TableWatermarks> for PbTableWatermarks { WatermarkDirection::Ascending => true, WatermarkDirection::Descending => false, }, + is_non_pk_prefix: match table_watermarks.watermark_type { + WatermarkSerdeType::NonPkPrefix => true, + WatermarkSerdeType::PkPrefix => false, + }, } } } @@ -715,6 +751,11 @@ impl From for TableWatermarks { } else { WatermarkDirection::Descending }, + watermark_type: if pb.is_non_pk_prefix { + WatermarkSerdeType::NonPkPrefix + } else { + WatermarkSerdeType::PkPrefix + }, } } } @@ -734,10 +775,20 @@ impl From for PbTableWatermarks { WatermarkDirection::Ascending => true, WatermarkDirection::Descending => false, }, + is_non_pk_prefix: match table_watermarks.watermark_type { + WatermarkSerdeType::NonPkPrefix => true, + WatermarkSerdeType::PkPrefix => false, + }, } } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)] +pub enum WatermarkSerdeType { + PkPrefix, + NonPkPrefix, +} + #[cfg(test)] mod tests { use std::collections::Bound::Included; @@ -758,7 +809,7 @@ mod tests { use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange}; use crate::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark, - WatermarkDirection, + WatermarkDirection, WatermarkSerdeType, }; use crate::version::HummockVersion; @@ -778,6 +829,7 @@ mod tests { let watermark2 = Bytes::from("watermark2"); let watermark3 = Bytes::from("watermark3"); let watermark4 = Bytes::from("watermark4"); + let watermark_type = WatermarkSerdeType::PkPrefix; let mut table_watermarks = TableWatermarks::single_epoch( epoch1, vec![VnodeWatermark::new( @@ -785,6 +837,7 @@ mod tests { watermark1.clone(), )], direction, + watermark_type, ); let epoch2 = epoch1.next_epoch(); table_watermarks.add_new_epoch_watermarks( @@ -795,6 +848,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let mut table_watermark_checkpoint = table_watermarks.clone(); @@ -807,6 +861,7 @@ mod tests { watermark3.clone(), )], direction, + watermark_type, ); table_watermarks.add_new_epoch_watermarks( epoch3, @@ -816,6 +871,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let epoch4 = epoch3.next_epoch(); let epoch5 = epoch4.next_epoch(); @@ -827,6 +883,7 @@ mod tests { )] .into(), direction, + watermark_type, ); second_table_watermark.add_new_epoch_watermarks( epoch5, @@ -836,6 +893,7 @@ mod tests { )] .into(), direction, + watermark_type, ); table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark); @@ -850,6 +908,7 @@ mod tests { let watermark2 = Bytes::from("watermark2"); let watermark3 = Bytes::from("watermark3"); let watermark4 = Bytes::from("watermark4"); + let watermark_type = WatermarkSerdeType::PkPrefix; let mut table_watermarks = TableWatermarks::single_epoch( epoch1, vec![VnodeWatermark::new( @@ -857,6 +916,7 @@ mod tests { watermark1.clone(), )], direction, + watermark_type, ); let epoch2 = epoch1.next_epoch(); table_watermarks.add_new_epoch_watermarks( @@ -867,6 +927,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let epoch3 = epoch2.next_epoch(); table_watermarks.add_new_epoch_watermarks( @@ -877,6 +938,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let epoch4 = epoch3.next_epoch(); let epoch5 = epoch4.next_epoch(); @@ -888,6 +950,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let mut table_watermarks_checkpoint = table_watermarks.clone(); @@ -925,6 +988,7 @@ mod tests { ) ], direction, + watermark_type, } ); @@ -951,6 +1015,7 @@ mod tests { ) ], direction, + watermark_type, } ); @@ -977,6 +1042,7 @@ mod tests { ) ], direction, + watermark_type, } ); @@ -996,6 +1062,7 @@ mod tests { .into() )], direction, + watermark_type, } ); } @@ -1026,6 +1093,7 @@ mod tests { .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap])) .collect(), direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, } } let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]); @@ -1050,6 +1118,7 @@ mod tests { epoch_new_watermark(5, vec![&build_bitmap(4..6)]), ], direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, }, ); expected.insert(TableId::new(2), table2_watermark); @@ -1235,6 +1304,7 @@ mod tests { .into(), )], direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, } .into(), ); diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 94176bcb49e51..d0fedd7ed7824 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -39,7 +39,7 @@ pub(crate) mod tests { use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::table_watermark::{ - ReadTableWatermark, TableWatermarks, VnodeWatermark, WatermarkDirection, + ReadTableWatermark, TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, CompactionGroupId}; @@ -1773,6 +1773,7 @@ pub(crate) mod tests { vec![VnodeWatermark::new(bitmap.clone(), watermark_key.clone())].into(), )], direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, }, ); diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index aede95fbd25f6..417a25e77829e 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -18,6 +18,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::EpochPair; +use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType; use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; use risingwave_pb::common::PbBuffer; @@ -263,7 +264,7 @@ pub struct TracedInitOptions { #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub struct TracedSealCurrentEpochOptions { // The watermark is serialized into protobuf - pub table_watermarks: Option<(bool, Vec>)>, + pub table_watermarks: Option<(bool, Vec>, WatermarkSerdeType)>, pub switch_op_consistency_level: Option, } diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 3160c4526e00f..26a8fc436383f 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -34,7 +34,7 @@ use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::catalog::TableId; use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ - TableWatermarks, VnodeWatermark, WatermarkDirection, + TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo}; use task_manager::{TaskManager, UploadingTaskStatus}; @@ -336,6 +336,7 @@ impl TableUnsyncData { epoch: HummockEpoch, table_watermarks: Vec, direction: WatermarkDirection, + watermark_type: WatermarkSerdeType, ) { if table_watermarks.is_empty() { return; @@ -361,12 +362,17 @@ impl TableUnsyncData { } } match &mut self.table_watermarks { - Some((prev_direction, prev_watermarks)) => { + Some((prev_direction, prev_watermarks, prev_watermark_type)) => { assert_eq!( *prev_direction, direction, "table id {} new watermark direction not match with previous", self.table_id ); + assert_eq!( + *prev_watermark_type, watermark_type, + "table id {} new watermark watermark_type not match with previous", + self.table_id + ); match prev_watermarks.entry(epoch) { Entry::Occupied(mut entry) => { let (prev_watermarks, vnode_bitmap) = entry.get_mut(); @@ -386,6 +392,7 @@ impl TableUnsyncData { self.table_watermarks = Some(( direction, BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]), + watermark_type, )); } } @@ -398,20 +405,28 @@ impl UploaderData { table_id: TableId, direction: WatermarkDirection, watermarks: impl Iterator)>, + watermark_type: WatermarkSerdeType, ) { let mut table_watermarks: Option = None; for (epoch, watermarks) in watermarks { match &mut table_watermarks { Some(prev_watermarks) => { + assert_eq!(prev_watermarks.direction, direction); + assert_eq!(prev_watermarks.watermark_type, watermark_type); prev_watermarks.add_new_epoch_watermarks( epoch, Arc::from(watermarks), direction, + watermark_type, ); } None => { - table_watermarks = - Some(TableWatermarks::single_epoch(epoch, watermarks, direction)); + table_watermarks = Some(TableWatermarks::single_epoch( + epoch, + watermarks, + direction, + watermark_type, + )); } } } @@ -620,6 +635,7 @@ struct TableUnsyncData { table_watermarks: Option<( WatermarkDirection, BTreeMap, BitmapBuilder)>, + WatermarkSerdeType, )>, spill_tasks: BTreeMap>, unsync_epochs: BTreeMap, @@ -663,6 +679,7 @@ impl TableUnsyncData { Option<( WatermarkDirection, impl Iterator)>, + WatermarkSerdeType, )>, impl Iterator, BTreeMap, @@ -684,14 +701,11 @@ impl TableUnsyncData { .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))), self.table_watermarks .as_mut() - .map(|(direction, watermarks)| { - let watermarks = take_before_epoch(watermarks, epoch); - ( - *direction, - watermarks - .into_iter() - .map(|(epoch, (watermarks, _))| (epoch, watermarks)), - ) + .map(|(direction, watermarks, watermark_type)| { + let watermarks = take_before_epoch(watermarks, epoch) + .into_iter() + .map(|(epoch, (watermarks, _))| (epoch, watermarks)); + (*direction, watermarks, *watermark_type) }), take_before_epoch(&mut self.spill_tasks, epoch) .into_values() @@ -729,7 +743,7 @@ impl TableUnsyncData { self.instance_data .values() .for_each(|instance_data| instance_data.assert_after_epoch(epoch)); - if let Some((_, watermarks)) = &self.table_watermarks + if let Some((_, watermarks, _)) = &self.table_watermarks && let Some((oldest_epoch, _)) = watermarks.first_key_value() { assert_gt!(*oldest_epoch, epoch); @@ -884,8 +898,8 @@ impl UnsyncData { table_data.stopped_next_epoch = Some(next_epoch); } } - if let Some((direction, table_watermarks)) = opts.table_watermarks { - table_data.add_table_watermarks(epoch, table_watermarks, direction); + if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks { + table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type); } } @@ -997,12 +1011,13 @@ impl UploaderData { flush_payload.insert(instance_id, payload); } } - if let Some((direction, watermarks)) = table_watermarks { + if let Some((direction, watermarks, watermark_type)) = table_watermarks { Self::add_table_watermarks( &mut all_table_watermarks, *table_id, direction, watermarks, + watermark_type, ); } for task_id in task_ids { diff --git a/src/storage/src/hummock/iterator/skip_watermark.rs b/src/storage/src/hummock/iterator/skip_watermark.rs index 61294ffe6a438..7125a41500aa8 100644 --- a/src/storage/src/hummock/iterator/skip_watermark.rs +++ b/src/storage/src/hummock/iterator/skip_watermark.rs @@ -240,8 +240,16 @@ impl SkipWatermarkState { return direction.filter_by_watermark_key(inner_key, watermark); } WatermarkSerdeType::Serde(watermark) => { - let (pk_prefix_serde, watermark_col_serde) = - self.last_serde.as_ref().unwrap(); + let (pk_prefix_serde, watermark_col_serde) = match self.last_serde.as_ref() + { + Some(serde) => serde, + None => { + self.last_serde = self + .compaction_catalog_agent_ref + .watermark_serde(table_id.table_id()); + self.last_serde.as_ref().unwrap() + } + }; let row = pk_prefix_serde .deserialize(inner_key) .unwrap_or_else(|_| { @@ -421,23 +429,31 @@ impl SkipWatermarkState { #[cfg(test)] mod tests { - use std::collections::BTreeMap; + use std::collections::{BTreeMap, HashMap}; use std::iter::{empty, once}; + use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::test_epoch; + use risingwave_common::util::row_serde::OrderedRowSerde; + use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::key::{gen_key_from_str, FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection}; use risingwave_hummock_sdk::EpochWithGap; - use crate::compaction_catalog_manager::CompactionCatalogAgent; + use crate::compaction_catalog_manager::{ + CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, + }; use crate::hummock::iterator::{HummockIterator, SkipWatermarkIterator}; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferValue, }; + use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; const EPOCH: u64 = test_epoch(1); const TABLE_ID: TableId = TableId::new(233); @@ -515,7 +531,8 @@ mod tests { watermarks: impl IntoIterator, direction: WatermarkDirection, ) { - let test_index = [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)]; + let test_index: [(usize, usize); 7] = + [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)]; let items = test_index .iter() .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index)) @@ -612,4 +629,112 @@ mod tests { async fn test_advance_multi_vnode() { test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await; } + + #[tokio::test] + async fn test_non_pk_prefix_watermark() { + let watermark_direction = WatermarkDirection::Ascending; + + let watermark_col_serde = + OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]); + let pk_serde = OrderedRowSerde::new( + vec![DataType::Int32, DataType::Int32], + vec![OrderType::ascending(), OrderType::ascending()], + ); + + fn gen_key_value( + vnode: usize, + index: usize, + pk_serde: &OrderedRowSerde, + ) -> (TableKey, SharedBufferValue) { + let r1 = OwnedRow::new(vec![ + Some(ScalarImpl::Int32(index as i32)), + Some(ScalarImpl::Int32(index as i32)), + ]); + let k1 = serialize_pk_with_vnode(r1, &pk_serde, VirtualNode::from_index(vnode)); + let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice( + format!("{}-value-{}", vnode, index).as_bytes(), + )); + (k1, v1) + } + + let shared_buffer_batch = { + let kv_pairs = (0..10).map(|i| gen_key_value(0, i, &pk_serde)); + build_batch(kv_pairs) + } + .unwrap(); + + { + // empty read watermark + let read_watermark = ReadTableWatermark { + direction: watermark_direction, + vnode_watermarks: BTreeMap::default(), + }; + + let compaction_catalog_agent_ref = + CompactionCatalogAgent::for_test(vec![TABLE_ID.into()]); + + let mut iter = SkipWatermarkIterator::new( + shared_buffer_batch.clone().into_forward_iter(), + BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))), + compaction_catalog_agent_ref, + ); + + iter.rewind().await.unwrap(); + assert_eq!(iter.is_valid(), true); + for i in 0..10 { + let (k, _v) = gen_key_value(0, i, &pk_serde); + assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref()); + iter.next().await.unwrap(); + } + } + + { + // test watermark + let watermark = { + let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]); + let watermark = serialize_pk(r1, &watermark_col_serde); + watermark + }; + + let read_watermark = ReadTableWatermark { + direction: watermark_direction, + vnode_watermarks: BTreeMap::from_iter(once(( + VirtualNode::from_index(0), + watermark.clone(), + ))), + }; + + let full_key_filter_key_extractor = + FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor); + + let table_id_to_vnode = + HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST))); + + let table_id_to_watermark_serde = HashMap::from_iter(once(( + TABLE_ID.table_id(), + Some((pk_serde.clone(), watermark_col_serde.clone())), + ))); + + let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( + full_key_filter_key_extractor, + table_id_to_vnode, + table_id_to_watermark_serde, + )); + + let mut iter = SkipWatermarkIterator::new( + shared_buffer_batch.clone().into_forward_iter(), + BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))), + compaction_catalog_agent_ref, + ); + + iter.rewind().await.unwrap(); + assert_eq!(iter.is_valid(), true); + for i in 5..10 { + let (k, _v) = gen_key_value(0, i, &pk_serde); + assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref()); + iter.next().await.unwrap(); + } + assert_eq!(iter.is_valid(), false); + } + } } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index bedc903283358..00a81feb57de3 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -25,6 +25,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType; use risingwave_hummock_sdk::EpochWithGap; use tracing::{warn, Instrument}; @@ -520,7 +521,10 @@ impl LocalStateStore for LocalHummockStorage { next_epoch, prev_epoch ); - if let Some((direction, watermarks)) = &mut opts.table_watermarks { + + if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) = + &mut opts.table_watermarks + { let mut read_version = self.read_version.write(); read_version.filter_regress_watermarks(watermarks); if !watermarks.is_empty() { @@ -531,6 +535,7 @@ impl LocalStateStore for LocalHummockStorage { }); } } + if !self.is_replicated && self .event_sender diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index e20c905b8e26b..ecb33fadc586e 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -34,7 +34,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::{ - TableWatermarksIndex, VnodeWatermark, WatermarkDirection, + TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::LevelType; @@ -249,19 +249,24 @@ impl HummockReadVersion { Self { table_id, instance_id, - table_watermarks: committed_version.table_watermarks.get(&table_id).map( - |table_watermarks| { - TableWatermarksIndex::new_committed( - table_watermarks.clone(), - committed_version - .state_table_info - .info() - .get(&table_id) - .expect("should exist") - .committed_epoch, - ) - }, - ), + table_watermarks: { + match committed_version.table_watermarks.get(&table_id) { + Some(table_watermarks) => match table_watermarks.watermark_type { + WatermarkSerdeType::PkPrefix => Some(TableWatermarksIndex::new_committed( + table_watermarks.clone(), + committed_version + .state_table_info + .info() + .get(&table_id) + .expect("should exist") + .committed_epoch, + )), + + WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */ + }, + None => None, + } + }, staging: StagingVersion { imm: VecDeque::default(), sst: VecDeque::default(), diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 86fe4ecd3f6ec..189b328e8ce62 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -29,7 +29,7 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common_estimate_size::{EstimateSize, KvSize}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange}; -use risingwave_hummock_sdk::table_watermark::WatermarkDirection; +use risingwave_hummock_sdk::table_watermark::{WatermarkDirection, WatermarkSerdeType}; use thiserror::Error; use thiserror_ext::AsReport; use tracing::error; @@ -719,7 +719,7 @@ impl LocalStateStore for MemtableLocalState next_epoch, prev_epoch ); - if let Some((direction, watermarks)) = opts.table_watermarks { + if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) = opts.table_watermarks { let delete_ranges = watermarks .iter() .flat_map(|vnode_watermark| { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 2a5405b9f9a17..9f42ccc903419 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -30,7 +30,9 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::table_watermark::{ + VnodeWatermark, WatermarkDirection, WatermarkSerdeType, +}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, @@ -797,25 +799,28 @@ impl From for InitOptions { #[derive(Clone, Debug)] pub struct SealCurrentEpochOptions { - pub table_watermarks: Option<(WatermarkDirection, Vec)>, + pub table_watermarks: Option<(WatermarkDirection, Vec, WatermarkSerdeType)>, pub switch_op_consistency_level: Option, } impl From for TracedSealCurrentEpochOptions { fn from(value: SealCurrentEpochOptions) -> Self { TracedSealCurrentEpochOptions { - table_watermarks: value.table_watermarks.map(|(direction, watermarks)| { - ( - direction == WatermarkDirection::Ascending, - watermarks - .into_iter() - .map(|watermark| { - let pb_watermark = PbVnodeWatermark::from(watermark); - Message::encode_to_vec(&pb_watermark) - }) - .collect(), - ) - }), + table_watermarks: value.table_watermarks.map( + |(direction, watermarks, watermark_type)| { + ( + direction == WatermarkDirection::Ascending, + watermarks + .into_iter() + .map(|watermark| { + let pb_watermark = PbVnodeWatermark::from(watermark); + Message::encode_to_vec(&pb_watermark) + }) + .collect(), + watermark_type, + ) + }, + ), switch_op_consistency_level: value .switch_op_consistency_level .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })), @@ -826,23 +831,26 @@ impl From for TracedSealCurrentEpochOptions { impl From for SealCurrentEpochOptions { fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions { SealCurrentEpochOptions { - table_watermarks: value.table_watermarks.map(|(is_ascending, watermarks)| { - ( - if is_ascending { - WatermarkDirection::Ascending - } else { - WatermarkDirection::Descending - }, - watermarks - .into_iter() - .map(|serialized_watermark| { - Message::decode(serialized_watermark.as_slice()) - .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb)) - .expect("should not failed") - }) - .collect(), - ) - }), + table_watermarks: value.table_watermarks.map( + |(is_ascending, watermarks, is_non_pk_prefix)| { + ( + if is_ascending { + WatermarkDirection::Ascending + } else { + WatermarkDirection::Descending + }, + watermarks + .into_iter() + .map(|serialized_watermark| { + Message::decode(serialized_watermark.as_slice()) + .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb)) + .expect("should not failed") + }) + .collect(), + is_non_pk_prefix, + ) + }, + ), switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| { if enable { OpConsistencyLevel::ConsistentOldValue { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 93e4c1211fe61..ab28386579a12 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -23,7 +23,9 @@ use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::util::epoch::EpochPair; use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter}; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::table_watermark::{ + VnodeWatermark, WatermarkDirection, WatermarkSerdeType, +}; use risingwave_storage::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions}; use tokio::sync::watch; @@ -179,7 +181,11 @@ impl LogWriter for KvLogStoreWriter { self.state_store.seal_current_epoch( next_epoch, SealCurrentEpochOptions { - table_watermarks: Some((WatermarkDirection::Ascending, watermark)), + table_watermarks: Some(( + WatermarkDirection::Ascending, + watermark, + WatermarkSerdeType::PkPrefix, + )), switch_op_consistency_level: None, }, ); diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index caecb2feb714e..3ee334be2db39 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -42,7 +42,9 @@ use risingwave_hummock_sdk::key::{ end_bound_of_prefix, prefixed_range_with_vnode, start_bound_of_excluded_prefix, CopyFromSlice, TableKey, TableKeyRange, }; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::table_watermark::{ + VnodeWatermark, WatermarkDirection, WatermarkSerdeType, +}; use risingwave_pb::catalog::Table; use risingwave_storage::error::{ErrorKind, StorageError}; use risingwave_storage::hummock::CachePolicy; @@ -255,7 +257,6 @@ where store: S, vnodes: Option>, ) -> Self { - println!("from_table_catalog table_catalog: {:?}", table_catalog); Self::from_table_catalog_with_consistency_level( table_catalog, store, @@ -949,10 +950,6 @@ where /// * `watermark` - Latest watermark received. pub fn update_watermark(&mut self, watermark: ScalarImpl) { trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark"); - println!( - "WATERMARK update watermark table_id: {}, watermark: {:?}", - self.table_id, watermark - ); self.pending_watermark = Some(watermark); } @@ -1080,14 +1077,10 @@ where } /// Commit pending watermark and return vnode bitmap-watermark pairs to seal. - fn commit_pending_watermark(&mut self) -> Option<(WatermarkDirection, Vec)> { + fn commit_pending_watermark( + &mut self, + ) -> Option<(WatermarkDirection, Vec, WatermarkSerdeType)> { let watermark = self.pending_watermark.take(); - println!( - "WATERMARK commit_pending_watermark table_id: {}, watermark: {:?} pk_indices: {:?}", - self.table_id, - watermark, - self.pk_indices() - ); watermark.as_ref().inspect(|watermark| { trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning"); }); @@ -1098,6 +1091,11 @@ where Some(self.pk_serde.index(self.watermark_col_idx)) }; + let watermark_type = match self.watermark_col_idx { + 0 => WatermarkSerdeType::PkPrefix, + _ => WatermarkSerdeType::NonPkPrefix, + }; + let should_clean_watermark = match watermark { Some(ref watermark) => { if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() { @@ -1127,27 +1125,30 @@ where ) }); - let mut seal_watermark: Option<(WatermarkDirection, VnodeWatermark)> = None; + let mut seal_watermark: Option<(WatermarkDirection, VnodeWatermark, WatermarkSerdeType)> = + None; // Compute Delete Ranges if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix { trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{ self.vnodes().iter_vnodes().collect_vec() }, "delete range"); - if prefix_serializer + + let order_type = prefix_serializer .as_ref() .unwrap() .get_order_types() - .get(self.watermark_col_idx) - .unwrap() - .is_ascending() - { + .get(0) + .unwrap(); + + if order_type.is_ascending() { seal_watermark = Some(( WatermarkDirection::Ascending, VnodeWatermark::new( self.vnodes().clone(), Bytes::copy_from_slice(watermark_suffix.as_ref()), ), + watermark_type, )); } else { seal_watermark = Some(( @@ -1156,6 +1157,7 @@ where self.vnodes().clone(), Bytes::copy_from_slice(watermark_suffix.as_ref()), ), + watermark_type, )); } } @@ -1172,7 +1174,9 @@ where self.watermark_cache.clear(); } - seal_watermark.map(|(direction, watermark)| (direction, vec![watermark])) + seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| { + (direction, vec![watermark], is_non_pk_prefix) + }) } pub async fn try_flush(&mut self) -> StreamExecutorResult<()> { diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 721c42b3cd746..72482759a8ac2 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -12,19 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::ops::Bound::{self, *}; +use std::sync::Arc; +use bytes::BytesMut; use futures::{pin_mut, StreamExt}; +use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; +use risingwave_common::hash::VirtualNode; use risingwave_common::row::{self, OwnedRow}; -use risingwave_common::types::{DataType, Scalar, Timestamptz}; +use risingwave_common::types::{DataType, Scalar, ScalarImpl, Timestamptz}; use risingwave_common::util::epoch::{test_epoch, EpochPair}; +use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::BasicSerde; +use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection}; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; +use risingwave_pb::catalog::PbTable; +use risingwave_pb::common::PbColumnOrder; +use risingwave_pb::plan_common::ColumnCatalog; +use risingwave_storage::compaction_catalog_manager::{ + CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, +}; +use risingwave_storage::hummock::iterator::SkipWatermarkIterator; use risingwave_storage::hummock::HummockStorage; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::SINGLETON_VNODE; @@ -2097,3 +2111,132 @@ async fn test_replicated_state_table_replication() { ); } } + +#[tokio::test] +async fn test_non_pk_prefix_watermark_read() { + // Define the base table to replicate + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let order_types = vec![OrderType::ascending(), OrderType::ascending()]; + let column_ids = [ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)]; + let column_descs = vec![ + ColumnDesc::unnamed(column_ids[0], DataType::Int32), + ColumnDesc::unnamed(column_ids[1], DataType::Int32), + ColumnDesc::unnamed(column_ids[2], DataType::Int32), + ]; + let pk_indices = vec![0_usize, 1_usize]; + let read_prefix_len_hint = 1; + let mut table = gen_pbtable( + TEST_TABLE_ID, + column_descs, + order_types, + pk_indices, + read_prefix_len_hint, + ); + + // non-pk-prefix watermark + let watermark_col_idx = 1; + table.watermark_indices = vec![watermark_col_idx]; + let test_env = prepare_hummock_test_env().await; + test_env.register_table(table.clone()).await; + + // Create the base state table + let mut state_table: crate::common::table::state_table::StateTableInner = + StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None) + .await; + + let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + state_table.init_epoch(epoch).await.unwrap(); + + // Insert first record into base state table + let r1 = OwnedRow::new(vec![ + Some(0_i32.into()), + Some(1_i32.into()), + Some(1_i32.into()), + ]); + state_table.insert(r1.clone()); + + let r2 = OwnedRow::new(vec![ + Some(0_i32.into()), + Some(2_i32.into()), + Some(2_i32.into()), + ]); + state_table.insert(r2.clone()); + + let r3 = OwnedRow::new(vec![ + Some(0_i32.into()), + Some(3_i32.into()), + Some(3_i32.into()), + ]); + state_table.insert(r3.clone()); + + epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + state_table.commit(epoch).await.unwrap(); + test_env.commit_epoch(epoch.prev).await; + + { + // test read + let item_1 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(1_i32.into())])) + .await + .unwrap() + .unwrap(); + assert_eq!(r1, item_1); + + let item_2 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(2_i32.into())])) + .await + .unwrap() + .unwrap(); + + assert_eq!(r2, item_2); + + let item_3 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(3_i32.into())])) + .await + .unwrap() + .unwrap(); + + assert_eq!(r3, item_3); + } + + { + // update watermark + let watermark = ScalarImpl::Int32(1); + state_table.update_watermark(watermark); + + epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + state_table.commit(epoch).await.unwrap(); + test_env.commit_epoch(epoch.prev).await; + + // do not rewrite key-range or filter data for non-pk-prefix watermark + let item_1 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(1_i32.into())])) + .await + .unwrap() + .unwrap(); + assert_eq!(r1, item_1); + + let item_2 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(2_i32.into())])) + .await + .unwrap() + .unwrap(); + assert_eq!(r2, item_2); + + let item_3 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(3_i32.into())])) + .await + .unwrap() + .unwrap(); + assert_eq!(r3, item_3); + } +}