diff --git a/kernel/src/engine/arrow_expression.rs b/kernel/src/engine/arrow_expression.rs index 8ee54ebd0..9f09a5fdd 100644 --- a/kernel/src/engine/arrow_expression.rs +++ b/kernel/src/engine/arrow_expression.rs @@ -519,12 +519,12 @@ impl ExpressionHandler for ArrowExpressionHandler { schema: SchemaRef, expression: Expression, output_type: DataType, - ) -> Arc { - Arc::new(DefaultExpressionEvaluator { + ) -> DeltaResult> { + Ok(Arc::new(DefaultExpressionEvaluator { input_schema: schema, expression: Box::new(expression), output_type, - }) + })) } } diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index d89cf29cd..012e13315 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -128,7 +128,7 @@ impl DefaultEngine { input_schema.into(), transform.clone(), output_schema.clone().into(), - ); + )?; let physical_data = logical_to_physical_expr.evaluate(data)?; self.parquet .write_parquet_file( diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index e771379d1..574846214 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -332,7 +332,7 @@ pub trait ExpressionHandler: AsAny { schema: SchemaRef, expression: Expression, output_type: DataType, - ) -> Arc; + ) -> DeltaResult>; } /// Provides file system related functionalities to Delta Kernel. diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 54eb5344c..66c2c622f 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::cmp::Ordering; use std::sync::{Arc, LazyLock}; -use tracing::debug; +use tracing::{debug, warn}; use crate::actions::get_log_add_schema; use crate::actions::visitors::SelectionVectorVisitor; @@ -99,24 +99,32 @@ impl DataSkippingFilter { // // 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when // the predicate is true/null and false (= skip) when the predicate is false. - let select_stats_evaluator = engine.get_expression_handler().get_evaluator( - // safety: kernel is very broken if we don't have the schema for Add actions - get_log_add_schema().clone(), - STATS_EXPR.clone(), - DataType::STRING, - ); + let select_stats_evaluator = engine + .get_expression_handler() + .get_evaluator( + // safety: kernel is very broken if we don't have the schema for Add actions + get_log_add_schema().clone(), + STATS_EXPR.clone(), + DataType::STRING, + ) + .inspect_err(|e| warn!("Failed to create stats selector evaluator: {}", e)) + .ok()?; - let skipping_evaluator = engine.get_expression_handler().get_evaluator( - stats_schema.clone(), - Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]), - PREDICATE_SCHEMA.clone(), - ); + let skipping_evaluator = engine + .get_expression_handler() + .get_evaluator( + stats_schema.clone(), + Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]), + PREDICATE_SCHEMA.clone(), + ) + .inspect_err(|e| warn!("Failed to create skipping evaluator: {}", e)) + .ok()?; - let filter_evaluator = engine.get_expression_handler().get_evaluator( - stats_schema.clone(), - FILTER_EXPR.clone(), - DataType::BOOLEAN, - ); + let filter_evaluator = engine + .get_expression_handler() + .get_evaluator(stats_schema.clone(), FILTER_EXPR.clone(), DataType::BOOLEAN) + .inspect_err(|e| warn!("Failed to create filter evaluator: {}", e)) + .ok()?; Some(Self { stats_schema, diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index fb5c2b0fa..b1a388102 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -237,21 +237,24 @@ impl LogReplayScanner { /// indicates whether the record batch is a log or checkpoint batch. pub fn scan_action_iter( engine: &dyn Engine, - action_iter: impl Iterator, bool)>>, + action_iter: impl Iterator, bool)>> + 'static, physical_predicate: Option<(ExpressionRef, SchemaRef)>, -) -> impl Iterator> { +) -> DeltaResult>> { let mut log_scanner = LogReplayScanner::new(engine, physical_predicate); let add_transform = engine.get_expression_handler().get_evaluator( get_log_add_schema().clone(), get_add_transform_expr(), SCAN_ROW_DATATYPE.clone(), - ); - action_iter + )?; + + let actions = action_iter .map(move |action_res| { let (batch, is_log_batch) = action_res?; log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) }) - .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))) + .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))); + + Ok(actions) } #[cfg(test)] @@ -291,7 +294,8 @@ mod tests { &[true, false], (), validate_simple, - ); + ) + .unwrap(); } #[test] @@ -301,6 +305,7 @@ mod tests { &[false, false, true, false], (), validate_simple, - ); + ) + .unwrap(); } } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 360e2bcb1..5fb3ddc76 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -381,11 +381,13 @@ impl Scan { PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)), PhysicalPredicate::None => None, }; + let it = scan_action_iter( engine, self.replay_for_scan_data(engine)?, physical_predicate, - ); + )?; + Ok(Some(it).into_iter().flatten()) } @@ -687,7 +689,7 @@ fn transform_to_logical_internal( physical_schema, read_expression, global_state.logical_schema.clone().into(), - ) + )? .evaluate(data.as_ref())?; Ok(result) } @@ -707,7 +709,7 @@ pub(crate) mod test_utils { sync::{json::SyncJsonHandler, SyncEngine}, }, scan::log_replay::scan_action_iter, - EngineData, JsonHandler, + DeltaResult, EngineData, JsonHandler, }; use super::state::ScanCallback; @@ -759,26 +761,26 @@ pub(crate) mod test_utils { expected_sel_vec: &[bool], context: T, validate_callback: ScanCallback, - ) { + ) -> DeltaResult<()> { let iter = scan_action_iter( &SyncEngine::new(), batch.into_iter().map(|batch| Ok((batch as _, true))), None, - ); + )?; let mut batch_count = 0; for res in iter { - let (batch, sel) = res.unwrap(); - assert_eq!(sel, expected_sel_vec); + let (batch, sel) = res?; + assert_eq!(sel.as_slice(), expected_sel_vec); crate::scan::state::visit_scan_files( batch.as_ref(), &sel, context.clone(), validate_callback, - ) - .unwrap(); + )?; batch_count += 1; } assert_eq!(batch_count, 1); + Ok(()) } } diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index b57f0c120..a75e4cf8f 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -253,6 +253,7 @@ mod tests { &[true, false], context, validate_visit, - ); + ) + .unwrap(); } } diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 89951a39b..53f436d2e 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -239,8 +239,7 @@ impl LogReplayScanner { get_log_add_schema().clone(), cdf_scan_row_expression(timestamp, commit_version), cdf_scan_row_schema().into(), - ); - + )?; let result = action_iter.map(move |actions| -> DeltaResult<_> { let actions = actions?; diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 0902bd2d0..f3c820cec 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -289,7 +289,8 @@ fn read_scan_file( physical_schema.clone(), physical_to_logical_expr, global_state.logical_schema.clone().into(), - ); + )?; + // Determine if the scan file was derived from a deletion vector pair let is_dv_resolved_pair = scan_file.remove_dv.is_some(); diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index c6e93ea7b..178bf4a19 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -201,7 +201,7 @@ fn generate_adds<'a>( write_metadata_schema.clone(), adds_expr, log_schema.clone().into(), - ); + )?; adds_evaluator.evaluate(write_metadata_batch) }) } @@ -321,7 +321,7 @@ fn generate_commit_info( engine_commit_info_schema.into(), commit_info_expr, commit_info_empty_struct_schema.into(), - ); + )?; commit_info_evaluator.evaluate(engine_commit_info) }