Skip to content

Commit

Permalink
refactor(storage): refactor watermark type
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Dec 25, 2024
1 parent e3dbc73 commit b71eff9
Show file tree
Hide file tree
Showing 16 changed files with 505 additions and 115 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message TableWatermarks {

// The direction of the table watermark.
bool is_ascending = 2;

// The table watermark is non-pk prefix table watermark.
bool is_non_pk_prefix = 3;
}

message EpochNewChangeLog {
Expand Down Expand Up @@ -194,6 +197,7 @@ message HummockVersion {
map<uint32, TableWatermarks> table_watermarks = 5;
map<uint32, TableChangeLog> table_change_logs = 6;
map<uint32, StateTableInfo> state_table_info = 7;
// map<uint32, TableWatermarks> non_pk_prefix_table_watermarks = 8;
}

message HummockVersionDelta {
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack", "num-traits"]
normal = ["workspace-hack"]

[dependencies]
bincode = { version = "=2.0.0-rc.3", features = ["serde"] }
bytes = "1"
easy-ext = "1"
hex = "0.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ pub fn safe_epoch_table_watermarks_impl(
Some(TableWatermarks {
watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
direction: table_watermarks.direction,
watermark_type: table_watermarks.watermark_type,
})
} else {
None
Expand Down
110 changes: 90 additions & 20 deletions src/storage/hummock_sdk/src/table_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::mem::size_of;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::sync::Arc;

use bincode::{Decode, Encode};
use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
Expand Down Expand Up @@ -408,19 +409,22 @@ pub struct TableWatermarks {
// later epoch at the back
pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
pub direction: WatermarkDirection,
pub watermark_type: WatermarkSerdeType,
}

impl TableWatermarks {
pub fn single_epoch(
epoch: HummockEpoch,
watermarks: Vec<VnodeWatermark>,
direction: WatermarkDirection,
watermark_type: WatermarkSerdeType,
) -> Self {
let mut this = Self {
direction,
watermarks: Vec::new(),
watermark_type,
};
this.add_new_epoch_watermarks(epoch, watermarks.into(), direction);
this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
this
}

Expand All @@ -429,8 +433,11 @@ impl TableWatermarks {
epoch: HummockEpoch,
watermarks: Arc<[VnodeWatermark]>,
direction: WatermarkDirection,
watermark_type: WatermarkSerdeType,
) {
assert_eq!(self.direction, direction);
assert_eq!(self.watermark_type, watermark_type);

if let Some((prev_epoch, _)) = self.watermarks.last() {
assert!(*prev_epoch < epoch);
}
Expand Down Expand Up @@ -475,26 +482,42 @@ impl TableWatermarks {
} else {
WatermarkDirection::Descending
},
watermark_type: if pb.is_non_pk_prefix {
WatermarkSerdeType::NonPkPrefix
} else {
WatermarkSerdeType::PkPrefix
},
}
}
}

pub fn merge_multiple_new_table_watermarks(
table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
) -> HashMap<TableId, TableWatermarks> {
let mut ret: HashMap<TableId, (WatermarkDirection, BTreeMap<u64, Vec<VnodeWatermark>>)> =
HashMap::new();
#[allow(clippy::type_complexity)]
let mut ret: HashMap<
TableId,
(
WatermarkDirection,
BTreeMap<u64, Vec<VnodeWatermark>>,
WatermarkSerdeType,
),
> = HashMap::new();
for table_watermarks in table_watermarks_list {
for (table_id, new_table_watermarks) in table_watermarks {
let epoch_watermarks = match ret.entry(table_id) {
Entry::Occupied(entry) => {
let (direction, epoch_watermarks) = entry.into_mut();
let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
assert_eq!(&new_table_watermarks.direction, direction);
assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
epoch_watermarks
}
Entry::Vacant(entry) => {
let (_, epoch_watermarks) =
entry.insert((new_table_watermarks.direction, BTreeMap::new()));
let (_, epoch_watermarks, _) = entry.insert((
new_table_watermarks.direction,
BTreeMap::new(),
new_table_watermarks.watermark_type,
));
epoch_watermarks
}
};
Expand All @@ -507,19 +530,22 @@ pub fn merge_multiple_new_table_watermarks(
}
}
ret.into_iter()
.map(|(table_id, (direction, epoch_watermarks))| {
(
table_id,
TableWatermarks {
direction,
// ordered from earlier epoch to later epoch
watermarks: epoch_watermarks
.into_iter()
.map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
.collect(),
},
)
})
.map(
|(table_id, (direction, epoch_watermarks, watermark_type))| {
(
table_id,
TableWatermarks {
direction,
// ordered from earlier epoch to later epoch
watermarks: epoch_watermarks
.into_iter()
.map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
.collect(),
watermark_type,
},
)
},
)
.collect()
}

Expand Down Expand Up @@ -625,6 +651,7 @@ impl TableWatermarks {
*self = TableWatermarks {
watermarks: result_epoch_watermark,
direction: self.direction,
watermark_type: self.watermark_type,
}
}
}
Expand Down Expand Up @@ -671,6 +698,11 @@ impl From<&PbTableWatermarks> for TableWatermarks {
} else {
WatermarkDirection::Descending
},
watermark_type: if pb.is_non_pk_prefix {
WatermarkSerdeType::NonPkPrefix
} else {
WatermarkSerdeType::PkPrefix
},
}
}
}
Expand All @@ -690,6 +722,10 @@ impl From<&TableWatermarks> for PbTableWatermarks {
WatermarkDirection::Ascending => true,
WatermarkDirection::Descending => false,
},
is_non_pk_prefix: match table_watermarks.watermark_type {
WatermarkSerdeType::NonPkPrefix => true,
WatermarkSerdeType::PkPrefix => false,
},
}
}
}
Expand All @@ -715,6 +751,11 @@ impl From<PbTableWatermarks> for TableWatermarks {
} else {
WatermarkDirection::Descending
},
watermark_type: if pb.is_non_pk_prefix {
WatermarkSerdeType::NonPkPrefix
} else {
WatermarkSerdeType::PkPrefix
},
}
}
}
Expand All @@ -734,10 +775,20 @@ impl From<TableWatermarks> for PbTableWatermarks {
WatermarkDirection::Ascending => true,
WatermarkDirection::Descending => false,
},
is_non_pk_prefix: match table_watermarks.watermark_type {
WatermarkSerdeType::NonPkPrefix => true,
WatermarkSerdeType::PkPrefix => false,
},
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Decode, Encode)]
pub enum WatermarkSerdeType {
PkPrefix,
NonPkPrefix,
}

#[cfg(test)]
mod tests {
use std::collections::Bound::Included;
Expand All @@ -758,7 +809,7 @@ mod tests {
use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange};
use crate::table_watermark::{
merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark,
WatermarkDirection,
WatermarkDirection, WatermarkSerdeType,
};
use crate::version::HummockVersion;

Expand All @@ -778,13 +829,15 @@ mod tests {
let watermark2 = Bytes::from("watermark2");
let watermark3 = Bytes::from("watermark3");
let watermark4 = Bytes::from("watermark4");
let watermark_type = WatermarkSerdeType::PkPrefix;
let mut table_watermarks = TableWatermarks::single_epoch(
epoch1,
vec![VnodeWatermark::new(
build_bitmap(vec![0, 1, 2]),
watermark1.clone(),
)],
direction,
watermark_type,
);
let epoch2 = epoch1.next_epoch();
table_watermarks.add_new_epoch_watermarks(
Expand All @@ -795,6 +848,7 @@ mod tests {
)]
.into(),
direction,
watermark_type,
);

let mut table_watermark_checkpoint = table_watermarks.clone();
Expand All @@ -807,6 +861,7 @@ mod tests {
watermark3.clone(),
)],
direction,
watermark_type,
);
table_watermarks.add_new_epoch_watermarks(
epoch3,
Expand All @@ -816,6 +871,7 @@ mod tests {
)]
.into(),
direction,
watermark_type,
);
let epoch4 = epoch3.next_epoch();
let epoch5 = epoch4.next_epoch();
Expand All @@ -827,6 +883,7 @@ mod tests {
)]
.into(),
direction,
watermark_type,
);
second_table_watermark.add_new_epoch_watermarks(
epoch5,
Expand All @@ -836,6 +893,7 @@ mod tests {
)]
.into(),
direction,
watermark_type,
);

table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
Expand All @@ -850,13 +908,15 @@ mod tests {
let watermark2 = Bytes::from("watermark2");
let watermark3 = Bytes::from("watermark3");
let watermark4 = Bytes::from("watermark4");
let watermark_type = WatermarkSerdeType::PkPrefix;
let mut table_watermarks = TableWatermarks::single_epoch(
epoch1,
vec![VnodeWatermark::new(
build_bitmap(vec![0, 1, 2]),
watermark1.clone(),
)],
direction,
watermark_type,
);
let epoch2 = epoch1.next_epoch();
table_watermarks.add_new_epoch_watermarks(
Expand All @@ -867,6 +927,7 @@ mod tests {
)]
.into(),
direction,
watermark_type,
);
let epoch3 = epoch2.next_epoch();
table_watermarks.add_new_epoch_watermarks(
Expand All @@ -877,6 +938,7 @@ mod tests {
)]
.into(),
direction,
watermark_type,
);
let epoch4 = epoch3.next_epoch();
let epoch5 = epoch4.next_epoch();
Expand All @@ -888,6 +950,7 @@ mod tests {
)]
.into(),
direction,
watermark_type,
);

let mut table_watermarks_checkpoint = table_watermarks.clone();
Expand Down Expand Up @@ -925,6 +988,7 @@ mod tests {
)
],
direction,
watermark_type,
}
);

Expand All @@ -951,6 +1015,7 @@ mod tests {
)
],
direction,
watermark_type,
}
);

Expand All @@ -977,6 +1042,7 @@ mod tests {
)
],
direction,
watermark_type,
}
);

Expand All @@ -996,6 +1062,7 @@ mod tests {
.into()
)],
direction,
watermark_type,
}
);
}
Expand Down Expand Up @@ -1026,6 +1093,7 @@ mod tests {
.map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
.collect(),
direction: WatermarkDirection::Ascending,
watermark_type: WatermarkSerdeType::PkPrefix,
}
}
let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
Expand All @@ -1050,6 +1118,7 @@ mod tests {
epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
],
direction: WatermarkDirection::Ascending,
watermark_type: WatermarkSerdeType::PkPrefix,
},
);
expected.insert(TableId::new(2), table2_watermark);
Expand Down Expand Up @@ -1235,6 +1304,7 @@ mod tests {
.into(),
)],
direction: WatermarkDirection::Ascending,
watermark_type: WatermarkSerdeType::PkPrefix,
}
.into(),
);
Expand Down
Loading

0 comments on commit b71eff9

Please sign in to comment.