diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index c4e97328f..9e2cee88c 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -12,8 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::engine::sync::SyncEngine; -use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats}; -use delta_kernel::scan::transform_to_logical; +use delta_kernel::scan::state::{transform_to_logical, DvInfo, GlobalScanState, Stats}; use delta_kernel::schema::Schema; use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table}; @@ -81,7 +80,7 @@ fn main() -> ExitCode { struct ScanFile { path: String, size: i64, - partition_values: HashMap, + transform: Option, dv_info: DvInfo, } @@ -111,13 +110,13 @@ fn send_scan_file( size: i64, _stats: Option, dv_info: DvInfo, - _transform: Option, - partition_values: HashMap, + transform: Option, + _: HashMap, ) { let scan_file = ScanFile { path: path.to_string(), size, - partition_values, + transform, dv_info, }; scan_tx.send(scan_file).unwrap(); @@ -258,7 +257,6 @@ fn do_work( ) { // get the type for the function calls let engine: &dyn Engine = engine.as_ref(); - let physical_schema = scan_state.physical_schema.clone(); // in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side // hangs up, which indicates there's no more data to process. while let Ok(scan_file) = scan_file_rx.recv() { @@ -289,19 +287,19 @@ fn do_work( // vector let read_results = engine .get_parquet_handler() - .read_parquet_files(&[meta], physical_schema.clone(), None) + .read_parquet_files(&[meta], scan_state.physical_schema.clone(), None) .unwrap(); for read_result in read_results { let read_result = read_result.unwrap(); let len = read_result.len(); - - // ask the kernel to transform the physical data into the correct logical form + // transform the physical data into the correct logical form let logical = transform_to_logical( engine, read_result, - &scan_state, - &scan_file.partition_values, + &scan_state.physical_schema, + &scan_state.logical_schema, + &scan_file.transform, ) .unwrap(); diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 7abebd0c6..14e2ee50f 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -445,7 +445,6 @@ impl Scan { partition_columns: self.snapshot.metadata().partition_columns.clone(), logical_schema: self.logical_schema.clone(), physical_schema: self.physical_schema.clone(), - column_mapping_mode: self.snapshot.column_mapping_mode(), } } @@ -465,7 +464,7 @@ impl Scan { path: String, size: i64, dv_info: DvInfo, - partition_values: HashMap, + transform: Option, } fn scan_data_callback( batches: &mut Vec, @@ -473,14 +472,14 @@ impl Scan { size: i64, _: Option, dv_info: DvInfo, - _transform: Option, - partition_values: HashMap, + transform: Option, + _: HashMap, ) { batches.push(ScanFile { path: path.to_string(), size, dv_info, - partition_values, + transform, }); } @@ -492,8 +491,6 @@ impl Scan { let global_state = Arc::new(self.global_scan_state()); let table_root = self.snapshot.table_root().clone(); let physical_predicate = self.physical_predicate(); - let all_fields = self.all_fields.clone(); - let have_partition_cols = self.have_partition_cols; let scan_data = self.scan_data(engine.as_ref())?; let scan_files_iter = scan_data @@ -537,17 +534,15 @@ impl Scan { // Arc clones let engine = engine.clone(); let global_state = global_state.clone(); - let all_fields = all_fields.clone(); Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> { let read_result = read_result?; - // to transform the physical data into the correct logical form - let logical = transform_to_logical_internal( + // transform the physical data into the correct logical form + let logical = state::transform_to_logical( engine.as_ref(), read_result, - &global_state, - &scan_file.partition_values, - &all_fields, - have_partition_cols, + &global_state.physical_schema, + &global_state.logical_schema, + &scan_file.transform, ); let len = logical.as_ref().map_or(0, |res| res.len()); // need to split the dv_mask. what's left in dv_mask covers this result, and rest @@ -665,73 +660,6 @@ pub fn selection_vector( Ok(deletion_treemap_to_bools(dv_treemap)) } -/// Transform the raw data read from parquet into the correct logical form, based on the provided -/// global scan state and partition values -pub fn transform_to_logical( - engine: &dyn Engine, - data: Box, - global_state: &GlobalScanState, - partition_values: &HashMap, -) -> DeltaResult> { - let state_info = get_state_info( - &global_state.logical_schema, - &global_state.partition_columns, - )?; - transform_to_logical_internal( - engine, - data, - global_state, - partition_values, - &state_info.all_fields, - state_info.have_partition_cols, - ) -} - -// We have this function because `execute` can save `all_fields` and `have_partition_cols` in the -// scan, and then reuse them for each batch transform -fn transform_to_logical_internal( - engine: &dyn Engine, - data: Box, - global_state: &GlobalScanState, - partition_values: &std::collections::HashMap, - all_fields: &[ColumnType], - have_partition_cols: bool, -) -> DeltaResult> { - let physical_schema = global_state.physical_schema.clone(); - if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None { - return Ok(data); - } - // need to add back partition cols and/or fix-up mapped columns - let all_fields = all_fields - .iter() - .map(|field| match field { - ColumnType::Partition(field_idx) => { - let field = global_state.logical_schema.fields.get_index(*field_idx); - let Some((_, field)) = field else { - return Err(Error::generic( - "logical schema did not contain expected field, can't transform data", - )); - }; - let name = field.physical_name(); - let value_expression = - parse_partition_value(partition_values.get(name), field.data_type())?; - Ok(value_expression.into()) - } - ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()), - }) - .try_collect()?; - let read_expression = Expression::Struct(all_fields); - let result = engine - .get_expression_handler() - .get_evaluator( - physical_schema, - read_expression, - global_state.logical_schema.clone().into(), - ) - .evaluate(data.as_ref())?; - Ok(result) -} - // some utils that are used in file_stream.rs and state.rs tests #[cfg(test)] pub(crate) mod test_utils { diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index c2496b682..e78f109bc 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -5,13 +5,13 @@ use std::sync::LazyLock; use crate::actions::deletion_vector::deletion_treemap_to_bools; use crate::scan::get_transform_for_row; +use crate::schema::Schema; use crate::utils::require; use crate::ExpressionRef; use crate::{ actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at}, engine_data::{GetData, RowVisitor, TypedGetData as _}, schema::{ColumnName, ColumnNamesAndTypes, DataType, SchemaRef}, - table_features::ColumnMappingMode, DeltaResult, Engine, EngineData, Error, }; use roaring::RoaringTreemap; @@ -27,7 +27,6 @@ pub struct GlobalScanState { pub partition_columns: Vec, pub logical_schema: SchemaRef, pub physical_schema: SchemaRef, - pub column_mapping_mode: ColumnMappingMode, } /// this struct can be used by an engine to materialize a selection vector @@ -100,6 +99,29 @@ impl DvInfo { } } +/// utility function for applying a transform expression to convert data from physical to logical +/// format +pub fn transform_to_logical( + engine: &dyn Engine, + physical_data: Box, + physical_schema: &SchemaRef, + logical_schema: &Schema, + transform: &Option, +) -> DeltaResult> { + if let Some(ref transform) = transform { + engine + .get_expression_handler() + .get_evaluator( + physical_schema.clone(), + transform.as_ref().clone(), // TODO: Maybe eval should take a ref + logical_schema.clone().into(), + ) + .evaluate(physical_data.as_ref()) + } else { + Ok(physical_data) + } +} + pub type ScanCallback = fn( context: &mut T, path: &str, diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 0c2ff3eed..4265e4805 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -211,7 +211,6 @@ impl TableChangesScan { partition_columns: end_snapshot.metadata().partition_columns.clone(), logical_schema: self.logical_schema.clone(), physical_schema: self.physical_schema.clone(), - column_mapping_mode: end_snapshot.column_mapping_mode(), } } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index c5e8a4b5b..9d5d24314 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -11,8 +11,8 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::expressions::{column_expr, BinaryOperator, Expression, ExpressionRef}; -use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats}; -use delta_kernel::scan::{transform_to_logical, Scan}; +use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo, Stats}; +use delta_kernel::scan::Scan; use delta_kernel::schema::{DataType, Schema}; use delta_kernel::{Engine, FileMeta, Table}; use object_store::{memory::InMemory, path::Path, ObjectStore}; @@ -339,7 +339,7 @@ struct ScanFile { path: String, size: i64, dv_info: DvInfo, - partition_values: HashMap, + transform: Option, } fn scan_data_callback( @@ -348,14 +348,14 @@ fn scan_data_callback( size: i64, _stats: Option, dv_info: DvInfo, - _transforms: Option, - partition_values: HashMap, + transform: Option, + _: HashMap, ) { batches.push(ScanFile { path: path.to_string(), size, dv_info, - partition_values, + transform, }); } @@ -404,16 +404,15 @@ fn read_with_scan_data( for read_result in read_results { let read_result = read_result.unwrap(); let len = read_result.len(); - - // ask the kernel to transform the physical data into the correct logical form + // to transform the physical data into the correct logical form let logical = transform_to_logical( engine, read_result, - &global_state, - &scan_file.partition_values, + &global_state.physical_schema, + &global_state.logical_schema, + &scan_file.transform, ) .unwrap(); - let record_batch = to_arrow(logical).unwrap(); let rest = split_vector(selection_vector.as_mut(), len, Some(true)); let batch = if let Some(mask) = selection_vector.clone() {