Skip to content

Part 3 of expression based transform: Use computed transform #613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::transform_to_logical;
use delta_kernel::scan::state::{transform_to_logical, DvInfo, GlobalScanState, Stats};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the old transform_to_logical code

??

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah -- the partition values aware version of transform_to_logical_internal is what disappeared.

use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table};

Expand Down Expand Up @@ -81,7 +80,7 @@ fn main() -> ExitCode {
struct ScanFile {
path: String,
size: i64,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
dv_info: DvInfo,
}

Expand Down Expand Up @@ -111,13 +110,13 @@ fn send_scan_file(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
_: HashMap<String, String>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be able to remove this or does this still prove useful elsewhere? (I know there are a number of spots where we use this callback and pass partition values - curious if we will eventually migrate everything to transform or if this is still actually needed)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. My idea was to keep it for now so existing systems could migrate more slowly. Basically, deprecate it

) {
let scan_file = ScanFile {
path: path.to_string(),
size,
partition_values,
transform,
dv_info,
};
scan_tx.send(scan_file).unwrap();
Expand Down Expand Up @@ -258,7 +257,6 @@ fn do_work(
) {
// get the type for the function calls
let engine: &dyn Engine = engine.as_ref();
let physical_schema = scan_state.physical_schema.clone();
// in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side
// hangs up, which indicates there's no more data to process.
while let Ok(scan_file) = scan_file_rx.recv() {
Expand Down Expand Up @@ -289,19 +287,19 @@ fn do_work(
// vector
let read_results = engine
.get_parquet_handler()
.read_parquet_files(&[meta], physical_schema.clone(), None)
.read_parquet_files(&[meta], scan_state.physical_schema.clone(), None)
.unwrap();

for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();

// ask the kernel to transform the physical data into the correct logical form
// transform the physical data into the correct logical form
let logical = transform_to_logical(
engine,
read_result,
&scan_state,
&scan_file.partition_values,
&scan_state.physical_schema,
&scan_state.logical_schema,
&scan_file.transform,
)
.unwrap();

Expand Down
90 changes: 9 additions & 81 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ impl Scan {
partition_columns: self.snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: self.snapshot.column_mapping_mode(),
}
}

Expand All @@ -465,22 +464,22 @@ impl Scan {
path: String,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
}
fn scan_data_callback(
batches: &mut Vec<ScanFile>,
path: &str,
size: i64,
_: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
_: HashMap<String, String>,
) {
batches.push(ScanFile {
path: path.to_string(),
size,
dv_info,
partition_values,
transform,
});
}

Expand All @@ -492,8 +491,6 @@ impl Scan {
let global_state = Arc::new(self.global_scan_state());
let table_root = self.snapshot.table_root().clone();
let physical_predicate = self.physical_predicate();
let all_fields = self.all_fields.clone();
let have_partition_cols = self.have_partition_cols;

let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
Expand Down Expand Up @@ -537,17 +534,15 @@ impl Scan {
// Arc clones
let engine = engine.clone();
let global_state = global_state.clone();
let all_fields = all_fields.clone();
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
// to transform the physical data into the correct logical form
let logical = transform_to_logical_internal(
// transform the physical data into the correct logical form
let logical = state::transform_to_logical(
engine.as_ref(),
read_result,
&global_state,
&scan_file.partition_values,
&all_fields,
have_partition_cols,
&global_state.physical_schema,
&global_state.logical_schema,
&scan_file.transform,
);
let len = logical.as_ref().map_or(0, |res| res.len());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
Expand Down Expand Up @@ -665,73 +660,6 @@ pub fn selection_vector(
Ok(deletion_treemap_to_bools(dv_treemap))
}

/// Transform the raw data read from parquet into the correct logical form, based on the provided
/// global scan state and partition values
pub fn transform_to_logical(
engine: &dyn Engine,
data: Box<dyn EngineData>,
global_state: &GlobalScanState,
partition_values: &HashMap<String, String>,
) -> DeltaResult<Box<dyn EngineData>> {
let state_info = get_state_info(
&global_state.logical_schema,
&global_state.partition_columns,
)?;
transform_to_logical_internal(
engine,
data,
global_state,
partition_values,
&state_info.all_fields,
state_info.have_partition_cols,
)
}

// We have this function because `execute` can save `all_fields` and `have_partition_cols` in the
// scan, and then reuse them for each batch transform
fn transform_to_logical_internal(
engine: &dyn Engine,
data: Box<dyn EngineData>,
global_state: &GlobalScanState,
partition_values: &std::collections::HashMap<String, String>,
all_fields: &[ColumnType],
have_partition_cols: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let physical_schema = global_state.physical_schema.clone();
if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this code was annoying when I added column mapping support for expression eval. I think this was the only code left that specifically tracked or cared about column mapping mode, because of the new way logical -> physical transforms are performed.

Recommend to audit the caller chain and see what other code can be simplified, now that we don't need column mapping logic here any more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! I've removed it from GlobalScanState as we don't need it there anymore

return Ok(data);
}
// need to add back partition cols and/or fix-up mapped columns
let all_fields = all_fields
.iter()
.map(|field| match field {
ColumnType::Partition(field_idx) => {
let field = global_state.logical_schema.fields.get_index(*field_idx);
let Some((_, field)) = field else {
return Err(Error::generic(
"logical schema did not contain expected field, can't transform data",
));
};
let name = field.physical_name();
let value_expression =
parse_partition_value(partition_values.get(name), field.data_type())?;
Ok(value_expression.into())
}
ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()),
})
.try_collect()?;
let read_expression = Expression::Struct(all_fields);
let result = engine
.get_expression_handler()
.get_evaluator(
physical_schema,
read_expression,
global_state.logical_schema.clone().into(),
)
.evaluate(data.as_ref())?;
Ok(result)
}

// some utils that are used in file_stream.rs and state.rs tests
#[cfg(test)]
pub(crate) mod test_utils {
Expand Down
27 changes: 25 additions & 2 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use std::sync::LazyLock;

use crate::actions::deletion_vector::deletion_treemap_to_bools;
use crate::scan::get_transform_for_row;
use crate::schema::Schema;
use crate::utils::require;
use crate::ExpressionRef;
use crate::{
actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at},
engine_data::{GetData, RowVisitor, TypedGetData as _},
schema::{ColumnName, ColumnNamesAndTypes, DataType, SchemaRef},
table_features::ColumnMappingMode,
DeltaResult, Engine, EngineData, Error,
};
use roaring::RoaringTreemap;
Expand All @@ -27,7 +27,6 @@ pub struct GlobalScanState {
pub partition_columns: Vec<String>,
pub logical_schema: SchemaRef,
pub physical_schema: SchemaRef,
pub column_mapping_mode: ColumnMappingMode,
}

/// this struct can be used by an engine to materialize a selection vector
Expand Down Expand Up @@ -100,6 +99,28 @@ impl DvInfo {
}
}

/// utility function for applying a transform expression to convert data from physical to logical
/// format
pub fn transform_to_logical(
engine: &dyn Engine,
physical_data: Box<dyn EngineData>,
physical_schema: &SchemaRef,
logical_schema: &Schema,
transform: &Option<ExpressionRef>,
) -> DeltaResult<Box<dyn EngineData>> {
match transform {
Some(ref transform) => engine
.get_expression_handler()
.get_evaluator(
physical_schema.clone(),
transform.as_ref().clone(), // TODO: Maybe eval should take a ref
logical_schema.clone().into(),
)
.evaluate(physical_data.as_ref()),
None => Ok(physical_data),
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc nit: Line 142/143 below might benefit from listing the new transform field

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, nice catch, added

pub type ScanCallback<T> = fn(
context: &mut T,
path: &str,
Expand All @@ -118,6 +139,8 @@ pub type ScanCallback<T> = fn(
/// * `path`: a `&str` which is the path to the file
/// * `size`: an `i64` which is the size of the file
/// * `dv_info`: a [`DvInfo`] struct, which allows getting the selection vector for this file
/// * `transform`: An optional expression that, if present, _must_ be applied to physical data to convert it to
/// the correct logical format
/// * `partition_values`: a `HashMap<String, String>` which are partition values
///
/// ## Context
Expand Down
1 change: 0 additions & 1 deletion kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ impl TableChangesScan {
partition_columns: end_snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: end_snapshot.column_mapping_mode(),
}
}

Expand Down
21 changes: 10 additions & 11 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression, ExpressionRef};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::{transform_to_logical, Scan};
use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::Scan;
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
use object_store::{memory::InMemory, path::Path, ObjectStore};
Expand Down Expand Up @@ -339,7 +339,7 @@ struct ScanFile {
path: String,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
}

fn scan_data_callback(
Expand All @@ -348,14 +348,14 @@ fn scan_data_callback(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transforms: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
_: HashMap<String, String>,
) {
batches.push(ScanFile {
path: path.to_string(),
size,
dv_info,
partition_values,
transform,
});
}

Expand Down Expand Up @@ -404,16 +404,15 @@ fn read_with_scan_data(
for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();

// ask the kernel to transform the physical data into the correct logical form
// to transform the physical data into the correct logical form
let logical = transform_to_logical(
engine,
read_result,
&global_state,
&scan_file.partition_values,
&global_state.physical_schema,
&global_state.logical_schema,
&scan_file.transform,
)
.unwrap();

let record_batch = to_arrow(logical).unwrap();
let rest = split_vector(selection_vector.as_mut(), len, Some(true));
let batch = if let Some(mask) = selection_vector.clone() {
Expand Down
Loading