From c6025aa133e1becaef5fa9d50e1dbac701296772 Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 7 Dec 2024 22:07:05 +0000 Subject: [PATCH 01/13] feat: make ExpressionHandler::get_evaluator fallible Make get_evaluator() return DeltaResult> to properly handle potential errors. Update all call sites to handle the Result type and simplify error handling using the ok()? operator where appropriate. --- kernel/src/engine/arrow_expression.rs | 6 ++--- kernel/src/engine/default/mod.rs | 2 +- kernel/src/lib.rs | 2 +- kernel/src/scan/data_skipping.rs | 37 +++++++++++++++----------- kernel/src/scan/log_replay.rs | 29 +++++++++++++------- kernel/src/scan/mod.rs | 2 +- kernel/src/table_changes/log_replay.rs | 2 +- kernel/src/transaction.rs | 4 +-- 8 files changed, 49 insertions(+), 35 deletions(-) diff --git a/kernel/src/engine/arrow_expression.rs b/kernel/src/engine/arrow_expression.rs index 8ee54ebd0..9f09a5fdd 100644 --- a/kernel/src/engine/arrow_expression.rs +++ b/kernel/src/engine/arrow_expression.rs @@ -519,12 +519,12 @@ impl ExpressionHandler for ArrowExpressionHandler { schema: SchemaRef, expression: Expression, output_type: DataType, - ) -> Arc { - Arc::new(DefaultExpressionEvaluator { + ) -> DeltaResult> { + Ok(Arc::new(DefaultExpressionEvaluator { input_schema: schema, expression: Box::new(expression), output_type, - }) + })) } } diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index d89cf29cd..012e13315 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -128,7 +128,7 @@ impl DefaultEngine { input_schema.into(), transform.clone(), output_schema.clone().into(), - ); + )?; let physical_data = logical_to_physical_expr.evaluate(data)?; self.parquet .write_parquet_file( diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 110a822e7..a863143ef 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -305,7 +305,7 @@ pub trait ExpressionHandler: AsAny { schema: SchemaRef, expression: Expression, output_type: DataType, - ) -> Arc; + ) -> DeltaResult>; } /// Provides file system related functionalities to Delta Kernel. diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index ad7816f74..0215af520 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -117,24 +117,29 @@ impl DataSkippingFilter { // // 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when // the predicate is true/null and false (= skip) when the predicate is false. - let select_stats_evaluator = engine.get_expression_handler().get_evaluator( - // safety: kernel is very broken if we don't have the schema for Add actions - get_log_add_schema().clone(), - STATS_EXPR.clone(), - DataType::STRING, - ); + let select_stats_evaluator = engine + .get_expression_handler() + .get_evaluator( + // safety: kernel is very broken if we don't have the schema for Add actions + get_log_add_schema().clone(), + STATS_EXPR.clone(), + DataType::STRING, + ) + .ok()?; - let skipping_evaluator = engine.get_expression_handler().get_evaluator( - stats_schema.clone(), - Expr::struct_from([as_data_skipping_predicate(predicate, false)?]), - PREDICATE_SCHEMA.clone(), - ); + let skipping_evaluator = engine + .get_expression_handler() + .get_evaluator( + stats_schema.clone(), + Expr::struct_from([as_data_skipping_predicate(predicate, false)?]), + PREDICATE_SCHEMA.clone(), + ) + .ok()?; - let filter_evaluator = engine.get_expression_handler().get_evaluator( - stats_schema.clone(), - FILTER_EXPR.clone(), - DataType::BOOLEAN, - ); + let filter_evaluator = engine + .get_expression_handler() + .get_evaluator(stats_schema.clone(), FILTER_EXPR.clone(), DataType::BOOLEAN) + .ok()?; Some(Self { stats_schema, diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index bf713ae7a..035557a68 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -1,5 +1,6 @@ use std::clone::Clone; use std::collections::HashSet; +use std::iter; use std::sync::{Arc, LazyLock}; use tracing::debug; @@ -241,22 +242,30 @@ impl LogReplayScanner { /// indicates whether the record batch is a log or checkpoint batch. pub fn scan_action_iter( engine: &dyn Engine, - action_iter: impl Iterator, bool)>>, + action_iter: impl Iterator, bool)>> + 'static, table_schema: &SchemaRef, predicate: Option, -) -> impl Iterator> { +) -> Box>> { let mut log_scanner = LogReplayScanner::new(engine, table_schema, predicate); - let add_transform = engine.get_expression_handler().get_evaluator( + match engine.get_expression_handler().get_evaluator( get_log_add_schema().clone(), get_add_transform_expr(), SCAN_ROW_DATATYPE.clone(), - ); - action_iter - .map(move |action_res| { - let (batch, is_log_batch) = action_res?; - log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) - }) - .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))) + ) { + Ok(add_transform) => Box::new( + action_iter + .map(move |action_res| { + let (batch, is_log_batch) = action_res?; + log_scanner.process_scan_batch( + add_transform.as_ref(), + batch.as_ref(), + is_log_batch, + ) + }) + .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))), + ), + Err(e) => Box::new(iter::once(Err(e))), + } } #[cfg(test)] diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index a60361179..af3c6a1e1 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -503,7 +503,7 @@ fn transform_to_logical_internal( read_schema, read_expression, global_state.logical_schema.clone().into(), - ) + )? .evaluate(data.as_ref())?; Ok(result) } diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 993a89912..211a329c3 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -251,7 +251,7 @@ impl LogReplayScanner { get_log_add_schema().clone(), add_transform_expr(), scan_row_schema().into(), - ); + )?; let result = action_iter.map(move |actions| -> DeltaResult<_> { let actions = actions?; diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index c6e93ea7b..178bf4a19 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -201,7 +201,7 @@ fn generate_adds<'a>( write_metadata_schema.clone(), adds_expr, log_schema.clone().into(), - ); + )?; adds_evaluator.evaluate(write_metadata_batch) }) } @@ -321,7 +321,7 @@ fn generate_commit_info( engine_commit_info_schema.into(), commit_info_expr, commit_info_empty_struct_schema.into(), - ); + )?; commit_info_evaluator.evaluate(engine_commit_info) } From f243d42378883ed6941dea4d9afd6f29bab2274c Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Mon, 9 Dec 2024 16:44:19 -0500 Subject: [PATCH 02/13] fix merge --- kernel/src/table_changes/log_replay.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 9c6cfe872..92a575598 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -237,11 +237,13 @@ impl LogReplayScanner { .version .try_into() .map_err(|_| Error::generic("Failed to convert commit version to i64"))?; - let evaluator = engine.get_expression_handler().get_evaluator( - get_log_add_schema().clone(), - cdf_scan_row_expression(timestamp, commit_version), - cdf_scan_row_schema().into(), - ); + let evaluator = engine + .get_expression_handler() + .get_evaluator( + get_log_add_schema().clone(), + cdf_scan_row_expression(timestamp, commit_version), + cdf_scan_row_schema().into(), + )?; let result = action_iter.map(move |actions| -> DeltaResult<_> { let actions = actions?; From 1e03c5fc3623d1dc3ff6fe6490560f29d46edbee Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Dec 2024 01:14:49 +0000 Subject: [PATCH 03/13] refactor: improve error handling in scan_action_iter and related functions - Modified scan_action_iter to return DeltaResult - Fixed code formatting in log_replay.rs files - Reordered imports in mod.rs for better readability - Improved error propagation in transaction.rs Co-Authored-By: Calvin Giroud --- kernel/src/scan/log_replay.rs | 27 ++++++++++---------------- kernel/src/scan/mod.rs | 18 ++++++++--------- kernel/src/table_changes/log_replay.rs | 13 +++++-------- 3 files changed, 24 insertions(+), 34 deletions(-) diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 035557a68..b10da166e 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -245,27 +245,20 @@ pub fn scan_action_iter( action_iter: impl Iterator, bool)>> + 'static, table_schema: &SchemaRef, predicate: Option, -) -> Box>> { +) -> DeltaResult>> { let mut log_scanner = LogReplayScanner::new(engine, table_schema, predicate); - match engine.get_expression_handler().get_evaluator( + let add_transform = engine.get_expression_handler().get_evaluator( get_log_add_schema().clone(), get_add_transform_expr(), SCAN_ROW_DATATYPE.clone(), - ) { - Ok(add_transform) => Box::new( - action_iter - .map(move |action_res| { - let (batch, is_log_batch) = action_res?; - log_scanner.process_scan_batch( - add_transform.as_ref(), - batch.as_ref(), - is_log_batch, - ) - }) - .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))), - ), - Err(e) => Box::new(iter::once(Err(e))), - } + )?; + + Ok(action_iter + .map(move |action_res| { + let (batch, is_log_batch) = action_res?; + log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) + }) + .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true)))) } #[cfg(test)] diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 413956398..500f854ea 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -219,12 +219,12 @@ impl Scan { &self, engine: &dyn Engine, ) -> DeltaResult>> { - Ok(scan_action_iter( + scan_action_iter( engine, self.replay_for_scan_data(engine)?, &self.logical_schema, self.predicate(), - )) + ) } // Factored out to facilitate testing @@ -526,7 +526,7 @@ pub(crate) mod test_utils { }, scan::log_replay::scan_action_iter, schema::{StructField, StructType}, - EngineData, JsonHandler, + DeltaResult, EngineData, JsonHandler, }; use super::state::ScanCallback; @@ -578,7 +578,7 @@ pub(crate) mod test_utils { expected_sel_vec: &[bool], context: T, validate_callback: ScanCallback, - ) { + ) -> DeltaResult<()> { let engine = SyncEngine::new(); // doesn't matter here let table_schema = Arc::new(StructType::new([StructField::new( @@ -591,21 +591,21 @@ pub(crate) mod test_utils { batch.into_iter().map(|batch| Ok((batch as _, true))), &table_schema, None, - ); + )?; let mut batch_count = 0; for res in iter { - let (batch, sel) = res.unwrap(); - assert_eq!(sel, expected_sel_vec); + let (batch, sel) = res?; + assert_eq!(sel.as_slice(), expected_sel_vec); crate::scan::state::visit_scan_files( batch.as_ref(), &sel, context.clone(), validate_callback, - ) - .unwrap(); + )?; batch_count += 1; } assert_eq!(batch_count, 1); + Ok(()) } } diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 92a575598..8168f56e2 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -237,14 +237,11 @@ impl LogReplayScanner { .version .try_into() .map_err(|_| Error::generic("Failed to convert commit version to i64"))?; - let evaluator = engine - .get_expression_handler() - .get_evaluator( - get_log_add_schema().clone(), - cdf_scan_row_expression(timestamp, commit_version), - cdf_scan_row_schema().into(), - )?; - + let evaluator = engine.get_expression_handler().get_evaluator( + get_log_add_schema().clone(), + cdf_scan_row_expression(timestamp, commit_version), + cdf_scan_row_schema().into(), + )?; let result = action_iter.map(move |actions| -> DeltaResult<_> { let actions = actions?; From 2588b3e86da8e8d623a88e4b1cd4ea9100bb0c36 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Dec 2024 01:15:20 +0000 Subject: [PATCH 04/13] feat: add warning logs for data skipping errors Added warning logs when evaluator creation fails in DataSkippingFilter: - Log stats selector evaluator failures - Log skipping evaluator failures - Log filter evaluator failures This improves debuggability when data skipping does not occur as expected. Co-Authored-By: Calvin Giroud --- kernel/src/scan/data_skipping.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 0215af520..7a2329ac6 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -3,7 +3,7 @@ use std::cmp::Ordering; use std::collections::HashSet; use std::sync::{Arc, LazyLock}; -use tracing::debug; +use tracing::{debug, warn}; use crate::actions::get_log_add_schema; use crate::actions::visitors::SelectionVectorVisitor; @@ -125,6 +125,10 @@ impl DataSkippingFilter { STATS_EXPR.clone(), DataType::STRING, ) + .map_err(|e| { + warn!("Failed to create stats selector evaluator: {}", e); + e + }) .ok()?; let skipping_evaluator = engine @@ -134,11 +138,19 @@ impl DataSkippingFilter { Expr::struct_from([as_data_skipping_predicate(predicate, false)?]), PREDICATE_SCHEMA.clone(), ) + .map_err(|e| { + warn!("Failed to create skipping evaluator: {}", e); + e + }) .ok()?; let filter_evaluator = engine .get_expression_handler() .get_evaluator(stats_schema.clone(), FILTER_EXPR.clone(), DataType::BOOLEAN) + .map_err(|e| { + warn!("Failed to create filter evaluator: {}", e); + e + }) .ok()?; Some(Self { From 9f6e617768d3a996bb3bb3d0877668e8ae1c74b4 Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Tue, 10 Dec 2024 20:54:38 -0500 Subject: [PATCH 05/13] fix merge --- kernel/src/scan/data_skipping.rs | 2 ++ kernel/src/scan/log_replay.rs | 1 - kernel/src/scan/mod.rs | 20 -------------------- kernel/src/table_changes/scan.rs | 2 +- 4 files changed, 3 insertions(+), 22 deletions(-) diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 99db9d480..bf00f2029 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -142,6 +142,8 @@ impl DataSkippingFilter { filter_evaluator, json_handler: engine.get_json_handler(), }) + } + /// Apply the DataSkippingFilter to an EngineData batch of actions. Returns a selection vector /// which can be applied to the actions to find those that passed data skipping. pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult> { diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index d47c7f234..428de6b9d 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -1,6 +1,5 @@ use std::clone::Clone; use std::collections::HashSet; -use std::iter; use std::sync::{Arc, LazyLock}; use tracing::debug; diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 3bd5ba851..d1c5c9cfc 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -704,15 +704,8 @@ pub(crate) mod test_utils { sync::{json::SyncJsonHandler, SyncEngine}, }, scan::log_replay::scan_action_iter, -<<<<<<< HEAD schema::{StructField, StructType}, DeltaResult, EngineData, JsonHandler, -||||||| 5816620 - schema::{StructField, StructType}, - EngineData, JsonHandler, -======= - EngineData, JsonHandler, ->>>>>>> upstream/main }; use super::state::ScanCallback; @@ -764,7 +757,6 @@ pub(crate) mod test_utils { expected_sel_vec: &[bool], context: T, validate_callback: ScanCallback, -<<<<<<< HEAD ) -> DeltaResult<()> { let engine = SyncEngine::new(); // doesn't matter here @@ -773,18 +765,6 @@ pub(crate) mod test_utils { crate::schema::DataType::STRING, false, )])); -||||||| 5816620 - ) { - let engine = SyncEngine::new(); - // doesn't matter here - let table_schema = Arc::new(StructType::new([StructField::new( - "foo", - crate::schema::DataType::STRING, - false, - )])); -======= - ) { ->>>>>>> upstream/main let iter = scan_action_iter( &SyncEngine::new(), batch.into_iter().map(|batch| Ok((batch as _, true))), diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index b40eaa4c6..bcc29bcd8 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -281,7 +281,7 @@ fn read_scan_file( physical_schema.clone(), physical_to_logical_expr, global_state.logical_schema.clone().into(), - ); + )?; let table_root = Url::parse(&global_state.table_root)?; let location = table_root.join(&scan_file.path)?; From 91bfcd8db40d6e911fe8bc5f1f2ad1892765725c Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Tue, 10 Dec 2024 20:56:01 -0500 Subject: [PATCH 06/13] remove comment --- kernel/src/scan/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index d1c5c9cfc..75daa1a24 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -382,7 +382,6 @@ impl Scan { PhysicalPredicate::None => None, }; - // Call scan_action_iter with proper error handling let it = scan_action_iter( engine, self.replay_for_scan_data(engine)?, From 7d03266b37bd1aedc241ad7d73280a1ecce64d45 Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Tue, 10 Dec 2024 21:05:21 -0500 Subject: [PATCH 07/13] fix all the warnings --- kernel/src/scan/log_replay.rs | 6 +++--- kernel/src/scan/mod.rs | 8 -------- kernel/src/scan/state.rs | 2 +- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 428de6b9d..bce3fde2b 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -292,16 +292,16 @@ mod tests { &[true, false], (), validate_simple, - ); + ).unwrap(); } #[test] fn test_scan_action_iter_with_remove() { - run_with_validate_callback( + let _ = run_with_validate_callback( vec![add_batch_with_remove()], &[false, false, true, false], (), validate_simple, - ); + ).unwrap(); } } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 75daa1a24..bebc25817 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -703,7 +703,6 @@ pub(crate) mod test_utils { sync::{json::SyncJsonHandler, SyncEngine}, }, scan::log_replay::scan_action_iter, - schema::{StructField, StructType}, DeltaResult, EngineData, JsonHandler, }; @@ -757,13 +756,6 @@ pub(crate) mod test_utils { context: T, validate_callback: ScanCallback, ) -> DeltaResult<()> { - let engine = SyncEngine::new(); - // doesn't matter here - let table_schema = Arc::new(StructType::new([StructField::new( - "foo", - crate::schema::DataType::STRING, - false, - )])); let iter = scan_action_iter( &SyncEngine::new(), batch.into_iter().map(|batch| Ok((batch as _, true))), diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 12bbed552..d89210621 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -253,6 +253,6 @@ mod tests { &[true, false], context, validate_visit, - ); + ).unwrap(); } } From b4bad3625c2a02f2f2868d7c7b27a81d760d8548 Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Tue, 10 Dec 2024 21:05:34 -0500 Subject: [PATCH 08/13] format --- kernel/src/scan/log_replay.rs | 6 ++++-- kernel/src/scan/state.rs | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index bce3fde2b..2975ffac6 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -292,7 +292,8 @@ mod tests { &[true, false], (), validate_simple, - ).unwrap(); + ) + .unwrap(); } #[test] @@ -302,6 +303,7 @@ mod tests { &[false, false, true, false], (), validate_simple, - ).unwrap(); + ) + .unwrap(); } } diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index d89210621..50c81c090 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -253,6 +253,7 @@ mod tests { &[true, false], context, validate_visit, - ).unwrap(); + ) + .unwrap(); } } From e7b01b2453eb262fd4d9f5fb7eb27c1a58f9f7b5 Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Tue, 10 Dec 2024 21:20:22 -0500 Subject: [PATCH 09/13] fix --- kernel/src/scan/log_replay.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 2975ffac6..2c52592b5 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -298,7 +298,7 @@ mod tests { #[test] fn test_scan_action_iter_with_remove() { - let _ = run_with_validate_callback( + run_with_validate_callback( vec![add_batch_with_remove()], &[false, false, true, false], (), From 6be446d571dcb5cbb4edcfde42306be2af7b65cb Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Wed, 11 Dec 2024 12:46:33 -0500 Subject: [PATCH 10/13] address comments --- kernel/src/scan/data_skipping.rs | 15 +++------------ kernel/src/scan/log_replay.rs | 8 ++++++-- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index bf00f2029..66c2c622f 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -107,10 +107,7 @@ impl DataSkippingFilter { STATS_EXPR.clone(), DataType::STRING, ) - .map_err(|e| { - warn!("Failed to create stats selector evaluator: {}", e); - e - }) + .inspect_err(|e| warn!("Failed to create stats selector evaluator: {}", e)) .ok()?; let skipping_evaluator = engine @@ -120,19 +117,13 @@ impl DataSkippingFilter { Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]), PREDICATE_SCHEMA.clone(), ) - .map_err(|e| { - warn!("Failed to create skipping evaluator: {}", e); - e - }) + .inspect_err(|e| warn!("Failed to create skipping evaluator: {}", e)) .ok()?; let filter_evaluator = engine .get_expression_handler() .get_evaluator(stats_schema.clone(), FILTER_EXPR.clone(), DataType::BOOLEAN) - .map_err(|e| { - warn!("Failed to create filter evaluator: {}", e); - e - }) + .inspect_err(|e| warn!("Failed to create filter evaluator: {}", e)) .ok()?; Some(Self { diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 2c52592b5..8238333ee 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -247,12 +247,16 @@ pub fn scan_action_iter( SCAN_ROW_DATATYPE.clone(), )?; - Ok(action_iter + let actions = action_iter .map(move |action_res| { let (batch, is_log_batch) = action_res?; log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) }) - .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true)))) + .filter(|res: &Result<(Box, Vec), Error>| { + res.as_ref().map_or(true, |(_, sv)| sv.contains(&true)) + }); + + Ok(actions) } #[cfg(test)] From 22f9a3a6bdc610df101117894b0e604406c40281 Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Wed, 11 Dec 2024 14:17:56 -0500 Subject: [PATCH 11/13] remove types --- kernel/src/scan/log_replay.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 8238333ee..62e90d6ab 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -252,7 +252,7 @@ pub fn scan_action_iter( let (batch, is_log_batch) = action_res?; log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) }) - .filter(|res: &Result<(Box, Vec), Error>| { + .filter(|res| { res.as_ref().map_or(true, |(_, sv)| sv.contains(&true)) }); From fa7c9ffdc786671535df9c118bc455a005787cef Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Wed, 11 Dec 2024 14:19:18 -0500 Subject: [PATCH 12/13] format --- kernel/src/scan/log_replay.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 62e90d6ab..b1a388102 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -252,9 +252,7 @@ pub fn scan_action_iter( let (batch, is_log_batch) = action_res?; log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) }) - .filter(|res| { - res.as_ref().map_or(true, |(_, sv)| sv.contains(&true)) - }); + .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))); Ok(actions) } From d8bc768c433c48d0af4cce38d9888143aa1523f3 Mon Sep 17 00:00:00 2001 From: Calvin Giroud Date: Fri, 13 Dec 2024 07:51:47 -0800 Subject: [PATCH 13/13] fix merge --- kernel/src/table_changes/scan.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 97944524f..f3c820cec 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -340,8 +340,7 @@ fn read_scan_file( // # Case 3 // These scan files are either simple adds, removes, or cdc files. This case is a noop because // the selection vector is `None`. - let extend = Some(! - ); + let extend = Some(!is_dv_resolved_pair); let rest = split_vector(sv.as_mut(), len, extend); let result = ScanResult { raw_data: logical,