Skip to content

Commit

Permalink
Address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Nov 22, 2024
1 parent 6d972a6 commit d3eb161
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
8 changes: 4 additions & 4 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ pub enum Error {
#[error("Unsupported: {0}")]
Unsupported(String),

#[error("Table changes disabled as of version {0}")]
TableChangesDisabled(Version),
#[error("Change data feed is unsupported for the table at version {0}")]
ChangeDataFeedUnsupported(Version),
}

// Convenience constructors for Error types that take a String argument
Expand Down Expand Up @@ -246,8 +246,8 @@ impl Error {
pub fn unsupported(msg: impl ToString) -> Self {
Self::Unsupported(msg.to_string())
}
pub fn table_changes_disabled(version: impl Into<Version>) -> Self {
Self::TableChangesDisabled(version.into())
pub fn change_data_feed_unsupported(version: impl Into<Version>) -> Self {
Self::ChangeDataFeedUnsupported(version.into())
}

// Capture a backtrace when the error is constructed.
Expand Down
37 changes: 25 additions & 12 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,24 @@ impl TableChanges {
/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located)
/// - `engine`: Implementation of [`Engine`] apis.
/// - `start_version`: The start version of the change data feed
/// - `end_version`: The end version of the change data feed. If this is none, this defaults to
/// the newest table version.
/// - `end_version`: The end version (inclusive) of the change data feed. If this is none, this
/// defaults to the newest table version.
pub fn try_new(
table_root: Url,
engine: &dyn Engine,
start_version: Version,
end_version: Option<Version>,
) -> DeltaResult<Self> {
// Both snapshots ensure that reading is supported at the start and end version using
// [`Protocol::ensure_read_supported`]. Note that we must still verify that reading is
// supported for every intermediary protocol actions
// [`Protocol`]: crate::actions::Protocol
let start_snapshot =
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?;
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?;

// Verify CDF is enabled at the beginning and end of the interval
// Verify CDF is enabled at the beginning and end of the interval. We must still check every
// intermediary metadata action in the range.
let is_cdf_enabled = |snapshot: &Snapshot| {
static ENABLE_CDF_FLAG: &str = "delta.enableChangeDataFeed";
snapshot
Expand All @@ -76,10 +81,11 @@ impl TableChanges {
.is_some_and(|val| val == "true")
};
if !is_cdf_enabled(&start_snapshot) {
return Err(Error::table_changes_disabled(start_version));
return Err(Error::change_data_feed_unsupported(start_version));
} else if !is_cdf_enabled(&end_snapshot) {
return Err(Error::table_changes_disabled(end_snapshot.version()));
return Err(Error::change_data_feed_unsupported(end_snapshot.version()));
}

if start_snapshot.schema() != end_snapshot.schema() {
return Err(Error::generic(format!(
"Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(),
Expand Down Expand Up @@ -110,10 +116,12 @@ impl TableChanges {
schema,
})
}

/// The start version of the `TableChanges`.
pub fn start_version(&self) -> Version {
self.start_version
}
/// The end version of the `TableChanges`. If no end_version was specified in
/// The end version (inclusive) of the [`TableChanges`]. If no `end_version` was specified in
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`.
pub fn end_version(&self) -> Version {
self.log_segment.end_version
Expand All @@ -123,13 +131,16 @@ impl TableChanges {
pub fn schema(&self) -> &Schema {
&self.schema
}
/// Path to the root of the table that is being read.
pub fn table_root(&self) -> &Url {
&self.table_root
}
/// The partition columns that will be read.
#[allow(unused)]
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
}
/// The column mapping mode at the end schema.
#[allow(unused)]
pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode {
&self.end_snapshot.column_mapping_mode
Expand All @@ -147,7 +158,8 @@ mod tests {
use itertools::assert_equal;

#[test]
fn test_enable_cdf_flag() {
fn table_changes_checks_enable_cdf_flag() {
// Table with CDF enabled, then disabled at 2 and enabled at 3
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
Expand All @@ -161,25 +173,26 @@ mod tests {
assert_eq!(table_changes.end_version(), end_version);
}

let invalid_ranges = [(0, Some(2)), (1, Some(2)), (2, Some(2))];
let invalid_ranges = [(0, 2), (1, 2), (2, 2), (2, 3)];
for (start_version, end_version) in invalid_ranges {
let res = table.table_changes(engine.as_ref(), start_version, end_version);
assert!(matches!(res, Err(Error::TableChangesDisabled(_))))
assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_))))
}
}
#[test]
fn test_schema_evolution_fails() {
fn schema_evolution_fails() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
let expected_msg = "Failed to build TableChanges: Start and end version schemas are different. Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }";

// A field in the schema goes from being nullable to non-nullable
let table_changes_res = table.table_changes(engine.as_ref(), 3, 4);
assert!(matches!(table_changes_res, Err(Error::Generic(_))));
assert!(matches!(table_changes_res, Err(Error::Generic(msg)) if msg == expected_msg));
}

#[test]
fn test_table_changes_has_cdf_schema() {
fn table_changes_has_cdf_schema() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
Expand Down

0 comments on commit d3eb161

Please sign in to comment.