diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index 51ca075b1..a38c5ff27 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -349,26 +349,19 @@ fn row_indexes_from_dv_impl( // Wrapper function that gets called by the kernel, transforms the arguments to make the ffi-able, // and then calls the ffi specified callback -fn rust_callback( - context: &mut ContextWrapper, - path: &str, - size: i64, - kernel_stats: Option, - dv_info: DvInfo, - partition_values: HashMap, -) { +fn rust_callback(context: &mut ContextWrapper, file: ScanFile) { let partition_map = CStringMap { - values: partition_values, + values: file.partition_values, }; - let stats = kernel_stats.map(|ks| Stats { + let stats = file.stats.map(|ks| Stats { num_records: ks.num_records, }); (context.callback)( context.engine_context, - kernel_string_slice!(path), - size, + kernel_string_slice!(file.path), + file.size, stats.as_ref(), - &dv_info, + &file.dv_info, &partition_map, ); } diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index 69770a1aa..a44ba3819 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -12,7 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::engine::sync::SyncEngine; -use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats}; +use delta_kernel::scan::state::{DvInfo, GlobalScanState, ScanFile, Stats}; use delta_kernel::scan::transform_to_logical; use delta_kernel::schema::Schema; use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table}; @@ -76,15 +76,6 @@ fn main() -> ExitCode { } } -// the way we as a connector represent data to scan. this is computed from the raw data returned -// from the scan, and could be any format the engine chooses to use to facilitate distributing work. -struct ScanFile { - path: String, - size: i64, - partition_values: HashMap, - dv_info: DvInfo, -} - // we know we're using arrow under the hood, so cast an EngineData into something we can work with fn to_arrow(data: Box) -> DeltaResult { Ok(data @@ -104,22 +95,9 @@ fn truncate_batch(batch: RecordBatch, rows: usize) -> RecordBatch { RecordBatch::try_new(batch.schema(), cols).unwrap() } -// This is the callback that will be called fo each valid scan row -fn send_scan_file( - scan_tx: &mut spmc::Sender, - path: &str, - size: i64, - _stats: Option, - dv_info: DvInfo, - partition_values: HashMap, -) { - let scan_file = ScanFile { - path: path.to_string(), - size, - partition_values, - dv_info, - }; - scan_tx.send(scan_file).unwrap(); +// This is the callback that will be called for each valid scan row +fn send_scan_file(scan_tx: &mut spmc::Sender, file: ScanFile) { + scan_tx.send(file).unwrap(); } fn try_main() -> DeltaResult<()> { diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index fb5c2b0fa..f3c077723 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -256,32 +256,26 @@ pub fn scan_action_iter( #[cfg(test)] mod tests { - use std::collections::HashMap; - use crate::scan::{ - state::{DvInfo, Stats}, + state, test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback}, }; // dv-info is more complex to validate, we validate that works in the test for visit_scan_files // in state.rs - fn validate_simple( - _: &mut (), - path: &str, - size: i64, - stats: Option, - _: DvInfo, - part_vals: HashMap, - ) { + fn validate_simple(_: &mut (), file: state::ScanFile) { assert_eq!( - path, + file.path, "part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet" ); - assert_eq!(size, 635); - assert!(stats.is_some()); - assert_eq!(stats.as_ref().unwrap().num_records, 10); - assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string())); - assert_eq!(part_vals.get("non-existent"), None); + assert_eq!(file.size, 635); + assert!(file.stats.is_some()); + assert_eq!(file.stats.as_ref().unwrap().num_records, 10); + assert_eq!( + file.partition_values.get("date"), + Some(&"2017-12-10".to_string()) + ); + assert_eq!(file.partition_values.get("non-existent"), None); } #[test] diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 9ccfa8a76..112829923 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -13,7 +13,6 @@ use crate::actions::deletion_vector::{ }; use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME}; use crate::expressions::{ColumnName, Expression, ExpressionRef, ExpressionTransform, Scalar}; -use crate::scan::state::{DvInfo, Stats}; use crate::schema::{ ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField, StructType, @@ -428,33 +427,13 @@ impl Scan { &self, engine: Arc, ) -> DeltaResult> + '_> { - struct ScanFile { - path: String, - size: i64, - dv_info: DvInfo, - partition_values: HashMap, + fn scan_data_callback(batches: &mut Vec, file: state::ScanFile) { + batches.push(file); } - fn scan_data_callback( - batches: &mut Vec, - path: &str, - size: i64, - _: Option, - dv_info: DvInfo, - partition_values: HashMap, - ) { - batches.push(ScanFile { - path: path.to_string(), - size, - dv_info, - partition_values, - }); - } - debug!( "Executing scan with logical schema {:#?} and physical schema {:#?}", self.logical_schema, self.physical_schema ); - let global_state = Arc::new(self.global_scan_state()); let scan_data = self.scan_data(engine.as_ref())?; let scan_files_iter = scan_data @@ -963,16 +942,9 @@ mod tests { fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult> { let scan_data = scan.scan_data(engine)?; - fn scan_data_callback( - paths: &mut Vec, - path: &str, - _size: i64, - _: Option, - dv_info: DvInfo, - _partition_values: HashMap, - ) { - paths.push(path.to_string()); - assert!(dv_info.deletion_vector.is_none()); + fn scan_data_callback(paths: &mut Vec, file: state::ScanFile) { + paths.push(file.path); + assert!(file.dv_info.deletion_vector.is_none()); } let mut files = vec![]; for data in scan_data { diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 12bbed552..cd6af868c 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -98,14 +98,17 @@ impl DvInfo { } } -pub type ScanCallback = fn( - context: &mut T, - path: &str, - size: i64, - stats: Option, - dv_info: DvInfo, - partition_values: HashMap, -); +/// A struct containing all the information needed for a scan file callback +#[derive(Debug, Clone)] +pub struct ScanFile { + pub path: String, + pub size: i64, + pub stats: Option, + pub dv_info: DvInfo, + pub partition_values: HashMap, +} + +pub type ScanCallback = fn(context: &mut T, file: ScanFile); /// Request that the kernel call a callback on each valid file that needs to be read for the /// scan. @@ -176,7 +179,8 @@ impl RowVisitor for ScanFileVisitor<'_, T> { continue; } // Since path column is required, use it to detect presence of an Add action - if let Some(path) = getters[0].get_opt(row_index, "scanFile.path")? { + let path: Option = getters[0].get_opt(row_index, "scanFile.path")?; + if let Some(path) = path { let size = getters[1].get(row_index, "scanFile.size")?; let stats: Option = getters[3].get_opt(row_index, "scanFile.stats")?; let stats: Option = @@ -195,14 +199,15 @@ impl RowVisitor for ScanFileVisitor<'_, T> { let dv_info = DvInfo { deletion_vector }; let partition_values = getters[9].get(row_index, "scanFile.fileConstantValues.partitionValues")?; - (self.callback)( - &mut self.context, - path, + + let scan_file = ScanFile { + path: path.to_string(), size, stats, dv_info, partition_values, - ) + }; + (self.callback)(&mut self.context, scan_file) } } Ok(()) @@ -211,36 +216,30 @@ impl RowVisitor for ScanFileVisitor<'_, T> { #[cfg(test)] mod tests { - use std::collections::HashMap; - use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback}; - use super::{DvInfo, Stats}; + use super::ScanFile; #[derive(Clone)] struct TestContext { id: usize, } - fn validate_visit( - context: &mut TestContext, - path: &str, - size: i64, - stats: Option, - dv_info: DvInfo, - part_vals: HashMap, - ) { + fn validate_visit(context: &mut TestContext, file: ScanFile) { assert_eq!( - path, + file.path, "part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet" ); - assert_eq!(size, 635); - assert!(stats.is_some()); - assert_eq!(stats.as_ref().unwrap().num_records, 10); - assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string())); - assert_eq!(part_vals.get("non-existent"), None); - assert!(dv_info.deletion_vector.is_some()); - let dv = dv_info.deletion_vector.unwrap(); + assert_eq!(file.size, 635); + assert!(file.stats.is_some()); + assert_eq!(file.stats.as_ref().unwrap().num_records, 10); + assert_eq!( + file.partition_values.get("date"), + Some(&"2017-12-10".to_string()) + ); + assert_eq!(file.partition_values.get("non-existent"), None); + assert!(file.dv_info.deletion_vector.is_some()); + let dv = file.dv_info.deletion_vector.unwrap(); assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1"); assert_eq!(context.id, 2); } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 7a674ce57..cae3b5bba 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::ops::Not; use std::path::PathBuf; use std::sync::Arc; @@ -11,7 +10,8 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::expressions::{column_expr, BinaryOperator, Expression}; -use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats}; +use delta_kernel::scan::state; +use delta_kernel::scan::state::visit_scan_files; use delta_kernel::scan::{transform_to_logical, Scan}; use delta_kernel::schema::{DataType, Schema}; use delta_kernel::{Engine, FileMeta, Table}; @@ -335,27 +335,8 @@ fn read_with_execute( Ok(()) } -struct ScanFile { - path: String, - size: i64, - dv_info: DvInfo, - partition_values: HashMap, -} - -fn scan_data_callback( - batches: &mut Vec, - path: &str, - size: i64, - _stats: Option, - dv_info: DvInfo, - partition_values: HashMap, -) { - batches.push(ScanFile { - path: path.to_string(), - size, - dv_info, - partition_values, - }); +fn scan_data_callback(files: &mut Vec, file: state::ScanFile) { + files.push(file); } fn read_with_scan_data(