-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add visitor SidecarVisitor
and Sidecar
action struct
#673
Changes from all commits
ba66f71
6ea6f14
3a25515
c428628
cfe08bc
ffb4d64
b45d9a0
2111b8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,14 +12,13 @@ use crate::{DeltaResult, Error}; | |
use super::deletion_vector::DeletionVectorDescriptor; | ||
use super::schemas::ToSchema as _; | ||
use super::{ | ||
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, ADD_NAME, CDC_NAME, | ||
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, | ||
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, CDC_NAME, | ||
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME, | ||
}; | ||
|
||
#[derive(Default)] | ||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
struct MetadataVisitor { | ||
pub(crate) struct MetadataVisitor { | ||
pub(crate) metadata: Option<Metadata>, | ||
} | ||
|
||
|
@@ -114,8 +113,7 @@ impl RowVisitor for SelectionVectorVisitor { | |
|
||
#[derive(Default)] | ||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
struct ProtocolVisitor { | ||
pub(crate) struct ProtocolVisitor { | ||
pub(crate) protocol: Option<Protocol>, | ||
} | ||
|
||
|
@@ -318,15 +316,13 @@ impl RowVisitor for RemoveVisitor { | |
#[allow(unused)] | ||
#[derive(Default)] | ||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
struct CdcVisitor { | ||
pub(crate) struct CdcVisitor { | ||
pub(crate) cdcs: Vec<Cdc>, | ||
} | ||
|
||
impl CdcVisitor { | ||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
fn visit_cdc<'a>( | ||
pub(crate) fn visit_cdc<'a>( | ||
row_index: usize, | ||
path: String, | ||
getters: &[&'a dyn GetData<'a>], | ||
|
@@ -377,7 +373,6 @@ pub type SetTransactionMap = HashMap<String, SetTransaction>; | |
/// | ||
#[derive(Default, Debug)] | ||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
pub(crate) struct SetTransactionVisitor { | ||
pub(crate) set_transactions: SetTransactionMap, | ||
pub(crate) application_id: Option<String>, | ||
|
@@ -393,8 +388,7 @@ impl SetTransactionVisitor { | |
} | ||
|
||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
fn visit_txn<'a>( | ||
pub(crate) fn visit_txn<'a>( | ||
row_index: usize, | ||
app_id: String, | ||
getters: &[&'a dyn GetData<'a>], | ||
|
@@ -444,6 +438,52 @@ impl RowVisitor for SetTransactionVisitor { | |
} | ||
} | ||
|
||
#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing | ||
#[derive(Default)] | ||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
pub(crate) struct SidecarVisitor { | ||
pub(crate) sidecars: Vec<Sidecar>, | ||
} | ||
|
||
impl SidecarVisitor { | ||
fn visit_sidecar<'a>( | ||
row_index: usize, | ||
path: String, | ||
getters: &[&'a dyn GetData<'a>], | ||
) -> DeltaResult<Sidecar> { | ||
Ok(Sidecar { | ||
path, | ||
size_in_bytes: getters[1].get(row_index, "sidecar.sizeInBytes")?, | ||
modification_time: getters[2].get(row_index, "sidecar.modificationTime")?, | ||
tags: getters[3].get_opt(row_index, "sidecar.tags")?, | ||
}) | ||
} | ||
} | ||
|
||
impl RowVisitor for SidecarVisitor { | ||
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { | ||
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = | ||
LazyLock::new(|| Sidecar::to_schema().leaves(SIDECAR_NAME)); | ||
NAMES_AND_TYPES.as_ref() | ||
} | ||
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { | ||
require!( | ||
getters.len() == 4, | ||
Error::InternalError(format!( | ||
"Wrong number of SidecarVisitor getters: {}", | ||
getters.len() | ||
)) | ||
); | ||
for i in 0..row_count { | ||
// Since path column is required, use it to detect presence of a sidecar action | ||
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? { | ||
self.sidecars.push(Self::visit_sidecar(i, path, getters)?); | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// Get a DV out of some engine data. The caller is responsible for slicing the `getters` slice such | ||
/// that the first element contains the `storageType` element of the deletion vector. | ||
pub(crate) fn visit_deletion_vector_at<'a>( | ||
|
@@ -501,7 +541,8 @@ mod tests { | |
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#, | ||
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#, | ||
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none", "delta.enableChangeDataFeed":"true"},"createdTime":1677811175819}}"#, | ||
r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"# | ||
r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#, | ||
r#"{"sidecar":{"path":"016ae953-37a9-438e-8683-9a9a4a79a395.parquet","sizeInBytes":9268,"modificationTime":1714496113961,"tags":{"tag_foo":"tag_bar"}}}"#, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. qq: would a relative path look like this or would it be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A relative path would just be the filename. |
||
] | ||
.into(); | ||
let output_schema = get_log_schema().clone(); | ||
|
@@ -544,6 +585,29 @@ mod tests { | |
Ok(()) | ||
} | ||
|
||
#[test] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aside: None of these tests check the error case where a required field is missing 🤔 Maybe we should make an issue to add those. Personally I'd also like to see a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea! #674 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. couldn't we also add a trivial case just for these additions to see it fail if sidecar.path is missing? (and keep the issue to extend to other cases?) |
||
fn test_parse_sidecar() -> DeltaResult<()> { | ||
let data = action_batch(); | ||
|
||
let mut visitor = SidecarVisitor::default(); | ||
visitor.visit_rows_of(data.as_ref())?; | ||
|
||
let sidecar1 = Sidecar { | ||
path: "016ae953-37a9-438e-8683-9a9a4a79a395.parquet".into(), | ||
size_in_bytes: 9268, | ||
modification_time: 1714496113961, | ||
tags: Some(HashMap::from([( | ||
"tag_foo".to_string(), | ||
"tag_bar".to_string(), | ||
)])), | ||
}; | ||
|
||
assert_eq!(visitor.sidecars.len(), 1); | ||
assert_eq!(visitor.sidecars[0], sidecar1); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn test_parse_metadata() -> DeltaResult<()> { | ||
let data = action_batch(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could add some docs and link to sidecar action in protocol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea 👍