Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add visitor SidecarVisitor and Sidecar action struct #673

Merged
merged 8 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SIDECAR_NAME: &str = "sidecar";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());
Expand All @@ -58,6 +60,7 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Option::<SetTransaction>::get_struct_field(SET_TRANSACTION_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Sidecar>::get_struct_field(SIDECAR_NAME),
// We don't support the following actions yet
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
Expand Down Expand Up @@ -326,9 +329,8 @@ where

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct CommitInfo {
pub(crate) struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
pub(crate) timestamp: Option<i64>,
Expand Down Expand Up @@ -417,9 +419,8 @@ impl Add {

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Remove {
pub(crate) struct Remove {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
/// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path.
Expand Down Expand Up @@ -468,9 +469,8 @@ struct Remove {

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Cdc {
pub(crate) struct Cdc {
/// A relative path to a change data file from the root of the table or an absolute path to a
/// change data file that should be added to the table. The path is a URI as specified by
/// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the file path.
Expand Down Expand Up @@ -511,6 +511,33 @@ pub struct SetTransaction {
pub last_updated: Option<i64>,
}

/// The sidecar action references a sidecar file which provides some of the checkpoint's
/// file actions. This action is only allowed in checkpoints following the V2 spec.
///
/// [More info]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information
#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Schema, Debug, PartialEq)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct Sidecar {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could add some docs and link to sidecar action in protocol?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea 👍

/// A path to a sidecar file that can be either:
/// - A relative path (just the file name) within the `_delta_log/_sidecars` directory.
/// - An absolute path
/// The path is a URI as specified by [RFC 2396 URI Generic Syntax], which needs to be decoded
/// to get the file path.
///
/// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt
pub path: String,

/// The size of the sidecar file in bytes.
pub size_in_bytes: i64,

/// The time this logical file was created, as milliseconds since the epoch.
pub modification_time: i64,

/// A map containing any additional metadata about the logicial file.
pub tags: Option<HashMap<String, String>>,
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -637,7 +664,7 @@ mod tests {
fn test_cdc_schema() {
let schema = get_log_schema()
.project(&[CDC_NAME])
.expect("Couldn't get remove field");
.expect("Couldn't get cdc field");
let expected = Arc::new(StructType::new([StructField::nullable(
"cdc",
StructType::new([
Expand All @@ -654,6 +681,23 @@ mod tests {
assert_eq!(schema, expected);
}

#[test]
fn test_sidecar_schema() {
let schema = get_log_schema()
.project(&[SIDECAR_NAME])
.expect("Couldn't get sidecar field");
let expected = Arc::new(StructType::new([StructField::nullable(
"sidecar",
StructType::new([
StructField::not_null("path", DataType::STRING),
StructField::not_null("sizeInBytes", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
tags_field(),
]),
)]));
assert_eq!(schema, expected);
}

#[test]
fn test_transaction_schema() {
let schema = get_log_schema()
Expand Down
92 changes: 78 additions & 14 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use crate::{DeltaResult, Error};
use super::deletion_vector::DeletionVectorDescriptor;
use super::schemas::ToSchema as _;
use super::{
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME,
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME,
};

#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct MetadataVisitor {
pub(crate) struct MetadataVisitor {
pub(crate) metadata: Option<Metadata>,
}

Expand Down Expand Up @@ -114,8 +113,7 @@ impl RowVisitor for SelectionVectorVisitor {

#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct ProtocolVisitor {
pub(crate) struct ProtocolVisitor {
pub(crate) protocol: Option<Protocol>,
}

Expand Down Expand Up @@ -318,15 +316,13 @@ impl RowVisitor for RemoveVisitor {
#[allow(unused)]
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct CdcVisitor {
pub(crate) struct CdcVisitor {
pub(crate) cdcs: Vec<Cdc>,
}

impl CdcVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn visit_cdc<'a>(
pub(crate) fn visit_cdc<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
Expand Down Expand Up @@ -377,7 +373,6 @@ pub type SetTransactionMap = HashMap<String, SetTransaction>;
///
#[derive(Default, Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
pub(crate) struct SetTransactionVisitor {
pub(crate) set_transactions: SetTransactionMap,
pub(crate) application_id: Option<String>,
Expand All @@ -393,8 +388,7 @@ impl SetTransactionVisitor {
}

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn visit_txn<'a>(
pub(crate) fn visit_txn<'a>(
row_index: usize,
app_id: String,
getters: &[&'a dyn GetData<'a>],
Expand Down Expand Up @@ -444,6 +438,52 @@ impl RowVisitor for SetTransactionVisitor {
}
}

#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct SidecarVisitor {
pub(crate) sidecars: Vec<Sidecar>,
}

impl SidecarVisitor {
fn visit_sidecar<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Sidecar> {
Ok(Sidecar {
path,
size_in_bytes: getters[1].get(row_index, "sidecar.sizeInBytes")?,
modification_time: getters[2].get(row_index, "sidecar.modificationTime")?,
tags: getters[3].get_opt(row_index, "sidecar.tags")?,
})
}
}

impl RowVisitor for SidecarVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Sidecar::to_schema().leaves(SIDECAR_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 4,
Error::InternalError(format!(
"Wrong number of SidecarVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of a sidecar action
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
}
Ok(())
}
}

/// Get a DV out of some engine data. The caller is responsible for slicing the `getters` slice such
/// that the first element contains the `storageType` element of the deletion vector.
pub(crate) fn visit_deletion_vector_at<'a>(
Expand Down Expand Up @@ -501,7 +541,8 @@ mod tests {
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none", "delta.enableChangeDataFeed":"true"},"createdTime":1677811175819}}"#,
r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#
r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#,
r#"{"sidecar":{"path":"016ae953-37a9-438e-8683-9a9a4a79a395.parquet","sizeInBytes":9268,"modificationTime":1714496113961,"tags":{"tag_foo":"tag_bar"}}}"#,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: would a relative path look like this or would it be _sidecar/016ae953-37a9-438e-8683-9a9a4a79a395.parquet?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A relative path would just be the filename.

]
.into();
let output_schema = get_log_schema().clone();
Expand Down Expand Up @@ -544,6 +585,29 @@ mod tests {
Ok(())
}

#[test]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: None of these tests check the error case where a required field is missing 🤔 Maybe we should make an issue to add those. Personally I'd also like to see a VisitorError type so that we can make stronger assertions on the failure cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! #674

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we also add a trivial case just for these additions to see it fail if sidecar.path is missing? (and keep the issue to extend to other cases?)

fn test_parse_sidecar() -> DeltaResult<()> {
let data = action_batch();

let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(data.as_ref())?;

let sidecar1 = Sidecar {
path: "016ae953-37a9-438e-8683-9a9a4a79a395.parquet".into(),
size_in_bytes: 9268,
modification_time: 1714496113961,
tags: Some(HashMap::from([(
"tag_foo".to_string(),
"tag_bar".to_string(),
)])),
};

assert_eq!(visitor.sidecars.len(), 1);
assert_eq!(visitor.sidecars[0], sidecar1);

Ok(())
}

#[test]
fn test_parse_metadata() -> DeltaResult<()> {
let data = action_batch();
Expand Down
Loading