diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 2352e0db7..8bcb5df50 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -45,6 +45,8 @@ pub(crate) const SET_TRANSACTION_NAME: &str = "txn"; pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo"; #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) const CDC_NAME: &str = "cdc"; +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] +pub(crate) const SIDECAR_NAME: &str = "sidecar"; static LOG_ADD_SCHEMA: LazyLock = LazyLock::new(|| StructType::new([Option::::get_struct_field(ADD_NAME)]).into()); @@ -58,6 +60,7 @@ static LOG_SCHEMA: LazyLock = LazyLock::new(|| { Option::::get_struct_field(SET_TRANSACTION_NAME), Option::::get_struct_field(COMMIT_INFO_NAME), Option::::get_struct_field(CDC_NAME), + Option::::get_struct_field(SIDECAR_NAME), // We don't support the following actions yet //Option::::get_struct_field(DOMAIN_METADATA_NAME), ]) @@ -326,9 +329,8 @@ where #[derive(Debug, Clone, PartialEq, Eq, Schema)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] #[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))] -struct CommitInfo { +pub(crate) struct CommitInfo { /// The time this logical file was created, as milliseconds since the epoch. /// Read: optional, write: required (that is, kernel always writes). pub(crate) timestamp: Option, @@ -417,9 +419,8 @@ impl Add { #[derive(Debug, Clone, PartialEq, Eq, Schema)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] #[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))] -struct Remove { +pub(crate) struct Remove { /// A relative path to a data file from the root of the table or an absolute path to a file /// that should be added to the table. The path is a URI as specified by /// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path. @@ -468,9 +469,8 @@ struct Remove { #[derive(Debug, Clone, PartialEq, Eq, Schema)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] #[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))] -struct Cdc { +pub(crate) struct Cdc { /// A relative path to a change data file from the root of the table or an absolute path to a /// change data file that should be added to the table. The path is a URI as specified by /// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the file path. @@ -511,6 +511,33 @@ pub struct SetTransaction { pub last_updated: Option, } +/// The sidecar action references a sidecar file which provides some of the checkpoint's +/// file actions. This action is only allowed in checkpoints following the V2 spec. +/// +/// [More info]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information +#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing +#[derive(Schema, Debug, PartialEq)] +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] +pub(crate) struct Sidecar { + /// A path to a sidecar file that can be either: + /// - A relative path (just the file name) within the `_delta_log/_sidecars` directory. + /// - An absolute path + /// The path is a URI as specified by [RFC 2396 URI Generic Syntax], which needs to be decoded + /// to get the file path. + /// + /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt + pub path: String, + + /// The size of the sidecar file in bytes. + pub size_in_bytes: i64, + + /// The time this logical file was created, as milliseconds since the epoch. + pub modification_time: i64, + + /// A map containing any additional metadata about the logicial file. + pub tags: Option>, +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -637,7 +664,7 @@ mod tests { fn test_cdc_schema() { let schema = get_log_schema() .project(&[CDC_NAME]) - .expect("Couldn't get remove field"); + .expect("Couldn't get cdc field"); let expected = Arc::new(StructType::new([StructField::nullable( "cdc", StructType::new([ @@ -654,6 +681,23 @@ mod tests { assert_eq!(schema, expected); } + #[test] + fn test_sidecar_schema() { + let schema = get_log_schema() + .project(&[SIDECAR_NAME]) + .expect("Couldn't get sidecar field"); + let expected = Arc::new(StructType::new([StructField::nullable( + "sidecar", + StructType::new([ + StructField::not_null("path", DataType::STRING), + StructField::not_null("sizeInBytes", DataType::LONG), + StructField::not_null("modificationTime", DataType::LONG), + tags_field(), + ]), + )])); + assert_eq!(schema, expected); + } + #[test] fn test_transaction_schema() { let schema = get_log_schema() diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 957befe80..9f34bd2c5 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -12,14 +12,13 @@ use crate::{DeltaResult, Error}; use super::deletion_vector::DeletionVectorDescriptor; use super::schemas::ToSchema as _; use super::{ - Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, ADD_NAME, CDC_NAME, - METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, + Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, CDC_NAME, + METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME, }; #[derive(Default)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] -struct MetadataVisitor { +pub(crate) struct MetadataVisitor { pub(crate) metadata: Option, } @@ -114,8 +113,7 @@ impl RowVisitor for SelectionVectorVisitor { #[derive(Default)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] -struct ProtocolVisitor { +pub(crate) struct ProtocolVisitor { pub(crate) protocol: Option, } @@ -318,15 +316,13 @@ impl RowVisitor for RemoveVisitor { #[allow(unused)] #[derive(Default)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] -struct CdcVisitor { +pub(crate) struct CdcVisitor { pub(crate) cdcs: Vec, } impl CdcVisitor { #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] - #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] - fn visit_cdc<'a>( + pub(crate) fn visit_cdc<'a>( row_index: usize, path: String, getters: &[&'a dyn GetData<'a>], @@ -377,7 +373,6 @@ pub type SetTransactionMap = HashMap; /// #[derive(Default, Debug)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] pub(crate) struct SetTransactionVisitor { pub(crate) set_transactions: SetTransactionMap, pub(crate) application_id: Option, @@ -393,8 +388,7 @@ impl SetTransactionVisitor { } #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] - #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] - fn visit_txn<'a>( + pub(crate) fn visit_txn<'a>( row_index: usize, app_id: String, getters: &[&'a dyn GetData<'a>], @@ -444,6 +438,52 @@ impl RowVisitor for SetTransactionVisitor { } } +#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing +#[derive(Default)] +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] +pub(crate) struct SidecarVisitor { + pub(crate) sidecars: Vec, +} + +impl SidecarVisitor { + fn visit_sidecar<'a>( + row_index: usize, + path: String, + getters: &[&'a dyn GetData<'a>], + ) -> DeltaResult { + Ok(Sidecar { + path, + size_in_bytes: getters[1].get(row_index, "sidecar.sizeInBytes")?, + modification_time: getters[2].get(row_index, "sidecar.modificationTime")?, + tags: getters[3].get_opt(row_index, "sidecar.tags")?, + }) + } +} + +impl RowVisitor for SidecarVisitor { + fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { + static NAMES_AND_TYPES: LazyLock = + LazyLock::new(|| Sidecar::to_schema().leaves(SIDECAR_NAME)); + NAMES_AND_TYPES.as_ref() + } + fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { + require!( + getters.len() == 4, + Error::InternalError(format!( + "Wrong number of SidecarVisitor getters: {}", + getters.len() + )) + ); + for i in 0..row_count { + // Since path column is required, use it to detect presence of a sidecar action + if let Some(path) = getters[0].get_opt(i, "sidecar.path")? { + self.sidecars.push(Self::visit_sidecar(i, path, getters)?); + } + } + Ok(()) + } +} + /// Get a DV out of some engine data. The caller is responsible for slicing the `getters` slice such /// that the first element contains the `storageType` element of the deletion vector. pub(crate) fn visit_deletion_vector_at<'a>( @@ -501,7 +541,8 @@ mod tests { r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#, r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none", "delta.enableChangeDataFeed":"true"},"createdTime":1677811175819}}"#, - r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"# + r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#, + r#"{"sidecar":{"path":"016ae953-37a9-438e-8683-9a9a4a79a395.parquet","sizeInBytes":9268,"modificationTime":1714496113961,"tags":{"tag_foo":"tag_bar"}}}"#, ] .into(); let output_schema = get_log_schema().clone(); @@ -544,6 +585,29 @@ mod tests { Ok(()) } + #[test] + fn test_parse_sidecar() -> DeltaResult<()> { + let data = action_batch(); + + let mut visitor = SidecarVisitor::default(); + visitor.visit_rows_of(data.as_ref())?; + + let sidecar1 = Sidecar { + path: "016ae953-37a9-438e-8683-9a9a4a79a395.parquet".into(), + size_in_bytes: 9268, + modification_time: 1714496113961, + tags: Some(HashMap::from([( + "tag_foo".to_string(), + "tag_bar".to_string(), + )])), + }; + + assert_eq!(visitor.sidecars.len(), 1); + assert_eq!(visitor.sidecars[0], sidecar1); + + Ok(()) + } + #[test] fn test_parse_metadata() -> DeltaResult<()> { let data = action_batch();