-
Notifications
You must be signed in to change notification settings - Fork 84
Part 3 of expression based transform: Use computed transform #613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f305650
a7a440b
83c07a6
c1210b1
2d598d8
ea80224
84c612b
d8a2355
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,8 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; | |
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; | ||
use delta_kernel::engine::default::DefaultEngine; | ||
use delta_kernel::engine::sync::SyncEngine; | ||
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats}; | ||
use delta_kernel::scan::transform_to_logical; | ||
use delta_kernel::scan::state::{transform_to_logical, DvInfo, GlobalScanState, Stats}; | ||
use delta_kernel::schema::Schema; | ||
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table}; | ||
|
||
|
@@ -81,7 +80,7 @@ fn main() -> ExitCode { | |
struct ScanFile { | ||
path: String, | ||
size: i64, | ||
partition_values: HashMap<String, String>, | ||
transform: Option<ExpressionRef>, | ||
dv_info: DvInfo, | ||
} | ||
|
||
|
@@ -111,13 +110,13 @@ fn send_scan_file( | |
size: i64, | ||
_stats: Option<Stats>, | ||
dv_info: DvInfo, | ||
_transform: Option<ExpressionRef>, | ||
partition_values: HashMap<String, String>, | ||
transform: Option<ExpressionRef>, | ||
_: HashMap<String, String>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we be able to remove this or does this still prove useful elsewhere? (I know there are a number of spots where we use this callback and pass partition values - curious if we will eventually migrate everything to transform or if this is still actually needed) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could. My idea was to keep it for now so existing systems could migrate more slowly. Basically, deprecate it |
||
) { | ||
let scan_file = ScanFile { | ||
path: path.to_string(), | ||
size, | ||
partition_values, | ||
transform, | ||
dv_info, | ||
}; | ||
scan_tx.send(scan_file).unwrap(); | ||
|
@@ -258,7 +257,6 @@ fn do_work( | |
) { | ||
// get the type for the function calls | ||
let engine: &dyn Engine = engine.as_ref(); | ||
let physical_schema = scan_state.physical_schema.clone(); | ||
// in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side | ||
// hangs up, which indicates there's no more data to process. | ||
while let Ok(scan_file) = scan_file_rx.recv() { | ||
|
@@ -289,19 +287,19 @@ fn do_work( | |
// vector | ||
let read_results = engine | ||
.get_parquet_handler() | ||
.read_parquet_files(&[meta], physical_schema.clone(), None) | ||
.read_parquet_files(&[meta], scan_state.physical_schema.clone(), None) | ||
.unwrap(); | ||
|
||
for read_result in read_results { | ||
let read_result = read_result.unwrap(); | ||
let len = read_result.len(); | ||
|
||
// ask the kernel to transform the physical data into the correct logical form | ||
// transform the physical data into the correct logical form | ||
let logical = transform_to_logical( | ||
engine, | ||
read_result, | ||
&scan_state, | ||
&scan_file.partition_values, | ||
&scan_state.physical_schema, | ||
&scan_state.logical_schema, | ||
&scan_file.transform, | ||
) | ||
.unwrap(); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -445,7 +445,6 @@ impl Scan { | |
partition_columns: self.snapshot.metadata().partition_columns.clone(), | ||
logical_schema: self.logical_schema.clone(), | ||
physical_schema: self.physical_schema.clone(), | ||
column_mapping_mode: self.snapshot.column_mapping_mode(), | ||
} | ||
} | ||
|
||
|
@@ -465,22 +464,22 @@ impl Scan { | |
path: String, | ||
size: i64, | ||
dv_info: DvInfo, | ||
partition_values: HashMap<String, String>, | ||
transform: Option<ExpressionRef>, | ||
} | ||
fn scan_data_callback( | ||
batches: &mut Vec<ScanFile>, | ||
path: &str, | ||
size: i64, | ||
_: Option<Stats>, | ||
dv_info: DvInfo, | ||
_transform: Option<ExpressionRef>, | ||
partition_values: HashMap<String, String>, | ||
transform: Option<ExpressionRef>, | ||
_: HashMap<String, String>, | ||
) { | ||
batches.push(ScanFile { | ||
path: path.to_string(), | ||
size, | ||
dv_info, | ||
partition_values, | ||
transform, | ||
}); | ||
} | ||
|
||
|
@@ -492,8 +491,6 @@ impl Scan { | |
let global_state = Arc::new(self.global_scan_state()); | ||
let table_root = self.snapshot.table_root().clone(); | ||
let physical_predicate = self.physical_predicate(); | ||
let all_fields = self.all_fields.clone(); | ||
let have_partition_cols = self.have_partition_cols; | ||
|
||
let scan_data = self.scan_data(engine.as_ref())?; | ||
let scan_files_iter = scan_data | ||
|
@@ -537,17 +534,15 @@ impl Scan { | |
// Arc clones | ||
let engine = engine.clone(); | ||
let global_state = global_state.clone(); | ||
let all_fields = all_fields.clone(); | ||
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> { | ||
let read_result = read_result?; | ||
// to transform the physical data into the correct logical form | ||
let logical = transform_to_logical_internal( | ||
// transform the physical data into the correct logical form | ||
let logical = state::transform_to_logical( | ||
engine.as_ref(), | ||
read_result, | ||
&global_state, | ||
&scan_file.partition_values, | ||
&all_fields, | ||
have_partition_cols, | ||
&global_state.physical_schema, | ||
&global_state.logical_schema, | ||
&scan_file.transform, | ||
); | ||
let len = logical.as_ref().map_or(0, |res| res.len()); | ||
// need to split the dv_mask. what's left in dv_mask covers this result, and rest | ||
|
@@ -665,73 +660,6 @@ pub fn selection_vector( | |
Ok(deletion_treemap_to_bools(dv_treemap)) | ||
} | ||
|
||
/// Transform the raw data read from parquet into the correct logical form, based on the provided | ||
/// global scan state and partition values | ||
pub fn transform_to_logical( | ||
engine: &dyn Engine, | ||
data: Box<dyn EngineData>, | ||
global_state: &GlobalScanState, | ||
partition_values: &HashMap<String, String>, | ||
) -> DeltaResult<Box<dyn EngineData>> { | ||
let state_info = get_state_info( | ||
&global_state.logical_schema, | ||
&global_state.partition_columns, | ||
)?; | ||
transform_to_logical_internal( | ||
engine, | ||
data, | ||
global_state, | ||
partition_values, | ||
&state_info.all_fields, | ||
state_info.have_partition_cols, | ||
) | ||
} | ||
|
||
// We have this function because `execute` can save `all_fields` and `have_partition_cols` in the | ||
// scan, and then reuse them for each batch transform | ||
fn transform_to_logical_internal( | ||
engine: &dyn Engine, | ||
data: Box<dyn EngineData>, | ||
global_state: &GlobalScanState, | ||
partition_values: &std::collections::HashMap<String, String>, | ||
all_fields: &[ColumnType], | ||
have_partition_cols: bool, | ||
) -> DeltaResult<Box<dyn EngineData>> { | ||
let physical_schema = global_state.physical_schema.clone(); | ||
if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC, this code was annoying when I added column mapping support for expression eval. I think this was the only code left that specifically tracked or cared about column mapping mode, because of the new way logical -> physical transforms are performed. Recommend to audit the caller chain and see what other code can be simplified, now that we don't need column mapping logic here any more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call! I've removed it from |
||
return Ok(data); | ||
} | ||
// need to add back partition cols and/or fix-up mapped columns | ||
let all_fields = all_fields | ||
.iter() | ||
.map(|field| match field { | ||
ColumnType::Partition(field_idx) => { | ||
let field = global_state.logical_schema.fields.get_index(*field_idx); | ||
let Some((_, field)) = field else { | ||
return Err(Error::generic( | ||
"logical schema did not contain expected field, can't transform data", | ||
)); | ||
}; | ||
let name = field.physical_name(); | ||
let value_expression = | ||
parse_partition_value(partition_values.get(name), field.data_type())?; | ||
Ok(value_expression.into()) | ||
} | ||
ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()), | ||
}) | ||
.try_collect()?; | ||
let read_expression = Expression::Struct(all_fields); | ||
let result = engine | ||
.get_expression_handler() | ||
.get_evaluator( | ||
physical_schema, | ||
read_expression, | ||
global_state.logical_schema.clone().into(), | ||
) | ||
.evaluate(data.as_ref())?; | ||
Ok(result) | ||
} | ||
|
||
// some utils that are used in file_stream.rs and state.rs tests | ||
#[cfg(test)] | ||
pub(crate) mod test_utils { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,13 @@ use std::sync::LazyLock; | |
|
||
use crate::actions::deletion_vector::deletion_treemap_to_bools; | ||
use crate::scan::get_transform_for_row; | ||
use crate::schema::Schema; | ||
use crate::utils::require; | ||
use crate::ExpressionRef; | ||
use crate::{ | ||
actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at}, | ||
engine_data::{GetData, RowVisitor, TypedGetData as _}, | ||
schema::{ColumnName, ColumnNamesAndTypes, DataType, SchemaRef}, | ||
table_features::ColumnMappingMode, | ||
DeltaResult, Engine, EngineData, Error, | ||
}; | ||
use roaring::RoaringTreemap; | ||
|
@@ -27,7 +27,6 @@ pub struct GlobalScanState { | |
pub partition_columns: Vec<String>, | ||
pub logical_schema: SchemaRef, | ||
pub physical_schema: SchemaRef, | ||
pub column_mapping_mode: ColumnMappingMode, | ||
} | ||
|
||
/// this struct can be used by an engine to materialize a selection vector | ||
|
@@ -100,6 +99,28 @@ impl DvInfo { | |
} | ||
} | ||
|
||
/// utility function for applying a transform expression to convert data from physical to logical | ||
/// format | ||
pub fn transform_to_logical( | ||
engine: &dyn Engine, | ||
physical_data: Box<dyn EngineData>, | ||
physical_schema: &SchemaRef, | ||
logical_schema: &Schema, | ||
transform: &Option<ExpressionRef>, | ||
) -> DeltaResult<Box<dyn EngineData>> { | ||
match transform { | ||
Some(ref transform) => engine | ||
.get_expression_handler() | ||
.get_evaluator( | ||
physical_schema.clone(), | ||
transform.as_ref().clone(), // TODO: Maybe eval should take a ref | ||
logical_schema.clone().into(), | ||
) | ||
.evaluate(physical_data.as_ref()), | ||
None => Ok(physical_data), | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doc nit: Line 142/143 below might benefit from listing the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, nice catch, added |
||
pub type ScanCallback<T> = fn( | ||
context: &mut T, | ||
path: &str, | ||
|
@@ -118,6 +139,8 @@ pub type ScanCallback<T> = fn( | |
/// * `path`: a `&str` which is the path to the file | ||
/// * `size`: an `i64` which is the size of the file | ||
/// * `dv_info`: a [`DvInfo`] struct, which allows getting the selection vector for this file | ||
/// * `transform`: An optional expression that, if present, _must_ be applied to physical data to convert it to | ||
/// the correct logical format | ||
/// * `partition_values`: a `HashMap<String, String>` which are partition values | ||
/// | ||
/// ## Context | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah -- the partition values aware version of
transform_to_logical_internal
is what disappeared.