From c947f4a918666369f70f8fb5ba905120d6e6bef5 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 6 Jan 2025 10:32:53 -0800 Subject: [PATCH] add materialize_scan_results --- acceptance/src/data.rs | 1 + kernel/Cargo.toml | 3 ++ .../examples/read-table-changes/src/main.rs | 24 +++---------- .../read-table-multi-threaded/src/main.rs | 1 + .../read-table-single-threaded/src/main.rs | 23 ++---------- kernel/src/engine/arrow_compute.rs | 28 +++++++++++++++ kernel/src/engine/mod.rs | 3 ++ kernel/tests/cdf.rs | 19 +++------- kernel/tests/common/mod.rs | 17 ++------- kernel/tests/golden_tables.rs | 35 +++++++------------ kernel/tests/read.rs | 1 + 11 files changed, 62 insertions(+), 93 deletions(-) create mode 100644 kernel/src/engine/arrow_compute.rs diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index 9832ac8a4..8bcef526c 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -116,6 +116,7 @@ pub async fn assert_scan_data(engine: Arc, test_case: &TestCaseInfo) let snapshot = table.snapshot(engine.as_ref(), None)?; let scan = snapshot.into_scan_builder().build()?; let mut schema = None; + // TODO replace with new util let batches: Vec = scan .execute(engine)? .map(|scan_result| -> DeltaResult<_> { diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index b5cb30634..4a4c01ae1 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -56,6 +56,8 @@ parquet = { workspace = true, optional = true } # Used for fetching direct urls (like pre-signed urls) reqwest = { version = "0.12.7", optional = true } strum = { version = "0.26", features = ["derive"] } +# used in arrow-compute feature for record batch filtering utility +arrow = { workspace = true, optional = true } # optionally used with default engine (though not required) @@ -66,6 +68,7 @@ hdfs-native = { workspace = true, optional = true } walkdir = { workspace = true, optional = true } [features] +arrow-compute = ["arrow", "arrow-conversion"] arrow-conversion = ["arrow-schema"] arrow-expression = ["arrow-arith", "arrow-array", "arrow-buffer", "arrow-ord", "arrow-schema"] cloud = [ diff --git a/kernel/examples/read-table-changes/src/main.rs b/kernel/examples/read-table-changes/src/main.rs index 3360a06cf..4f1888821 100644 --- a/kernel/examples/read-table-changes/src/main.rs +++ b/kernel/examples/read-table-changes/src/main.rs @@ -1,9 +1,9 @@ use std::{collections::HashMap, sync::Arc}; -use arrow::{compute::filter_record_batch, util::pretty::print_batches}; +use arrow::util::pretty::print_batches; use arrow_array::RecordBatch; use clap::Parser; -use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::arrow_compute::materialize_scan_results; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::{DeltaResult, Table}; @@ -35,24 +35,8 @@ fn main() -> DeltaResult<()> { let table_changes = table.table_changes(engine.as_ref(), cli.start_version, cli.end_version)?; let table_changes_scan = table_changes.into_scan_builder().build()?; - let batches: Vec = table_changes_scan - .execute(engine.clone())? - .map(|scan_result| -> DeltaResult<_> { - let scan_result = scan_result?; - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let record_batch: RecordBatch = data - .into_any() - .downcast::() - .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? - .into(); - if let Some(mask) = mask { - Ok(filter_record_batch(&record_batch, &mask.into())?) - } else { - Ok(record_batch) - } - }) - .try_collect()?; + let batches: Vec = + materialize_scan_results(table_changes_scan.execute(engine.clone())?).try_collect()?; print_batches(&batches)?; Ok(()) } diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index d97b6c2d3..7f1ffb845 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -290,6 +290,7 @@ fn do_work( .read_parquet_files(&[meta], physical_schema.clone(), None) .unwrap(); + // replace with materialize_scan_results? for read_result in read_results { let read_result = read_result.unwrap(); let len = read_result.len(); diff --git a/kernel/examples/read-table-single-threaded/src/main.rs b/kernel/examples/read-table-single-threaded/src/main.rs index 32ad3173d..4e3d222df 100644 --- a/kernel/examples/read-table-single-threaded/src/main.rs +++ b/kernel/examples/read-table-single-threaded/src/main.rs @@ -2,10 +2,9 @@ use std::collections::HashMap; use std::process::ExitCode; use std::sync::Arc; -use arrow::compute::filter_record_batch; use arrow::record_batch::RecordBatch; use arrow::util::pretty::print_batches; -use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::arrow_compute::materialize_scan_results; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::engine::sync::SyncEngine; @@ -119,24 +118,8 @@ fn try_main() -> DeltaResult<()> { .with_schema_opt(read_schema_opt) .build()?; - let batches: Vec = scan - .execute(engine)? - .map(|scan_result| -> DeltaResult<_> { - let scan_result = scan_result?; - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let record_batch: RecordBatch = data - .into_any() - .downcast::() - .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? - .into(); - if let Some(mask) = mask { - Ok(filter_record_batch(&record_batch, &mask.into())?) - } else { - Ok(record_batch) - } - }) - .try_collect()?; + let batches: Vec = + materialize_scan_results(scan.execute(engine)?).try_collect()?; print_batches(&batches)?; Ok(()) } diff --git a/kernel/src/engine/arrow_compute.rs b/kernel/src/engine/arrow_compute.rs new file mode 100644 index 000000000..3f416c392 --- /dev/null +++ b/kernel/src/engine/arrow_compute.rs @@ -0,0 +1,28 @@ +use crate::engine::arrow_data::ArrowEngineData; +use crate::error::DeltaResult; +use crate::scan::ScanResult; + +use arrow::compute::filter_record_batch; +use arrow_array::RecordBatch; + +/// Utility function that transforms an iterator of `ScanResult`s into an iterator of +/// `RecordBatch`s containing the actual scan data by applying the scan result's mask to the data. +/// +/// It uses arrow-compute's `filter_record_batch` function to apply the mask to the data. +pub fn materialize_scan_results( + scan_results: impl Iterator>, +) -> impl Iterator> { + scan_results.map(|res| { + let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); + let (mask, data) = scan_res?; + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| crate::Error::EngineDataType("ArrowEngineData".to_string()))? + .into(); + Ok(match mask { + Some(mask) => filter_record_batch(&record_batch, &mask.into())?, + None => record_batch, + }) + }) +} diff --git a/kernel/src/engine/mod.rs b/kernel/src/engine/mod.rs index fdeca558a..bbd0243a7 100644 --- a/kernel/src/engine/mod.rs +++ b/kernel/src/engine/mod.rs @@ -33,3 +33,6 @@ declare_modules!( (pub(crate), arrow_utils), (pub(crate), ensure_data_types) ); + +#[cfg(feature = "arrow-compute")] +pub mod arrow_compute; diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 2be5324fc..d22b25df0 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -1,15 +1,15 @@ use std::{error, sync::Arc}; -use arrow::compute::filter_record_batch; use arrow_array::RecordBatch; use delta_kernel::engine::sync::SyncEngine; use itertools::Itertools; +use delta_kernel::engine::arrow_compute::materialize_scan_results; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::{DeltaResult, Table, Version}; mod common; -use common::{load_test_data, to_arrow}; +use common::load_test_data; fn read_cdf_for_table( test_name: impl AsRef, @@ -35,19 +35,8 @@ fn read_cdf_for_table( .into_scan_builder() .with_schema(schema) .build()?; - let batches: Vec = scan - .execute(engine)? - .map(|scan_result| -> DeltaResult<_> { - let scan_result = scan_result?; - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let record_batch = to_arrow(data)?; - match mask { - Some(mask) => Ok(filter_record_batch(&record_batch, &mask.into())?), - None => Ok(record_batch), - } - }) - .try_collect()?; + let batches: Vec = + materialize_scan_results(scan.execute(engine)?).try_collect()?; Ok(batches) } diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index a918695b7..81e1ad086 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -1,4 +1,3 @@ -use arrow::compute::filter_record_batch; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; use itertools::Itertools; @@ -6,6 +5,7 @@ use itertools::Itertools; use crate::ArrowEngineData; use delta_kernel::scan::Scan; use delta_kernel::{DeltaResult, Engine, EngineData, Table}; +use delta_kernel::engine::arrow_compute::materialize_scan_results; use std::sync::Arc; @@ -87,18 +87,5 @@ pub(crate) fn test_read( // TODO (zach): this is listed as unused for acceptance crate #[allow(unused)] pub(crate) fn read_scan(scan: &Scan, engine: Arc) -> DeltaResult> { - let scan_results = scan.execute(engine)?; - scan_results - .map(|scan_result| -> DeltaResult<_> { - let scan_result = scan_result?; - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let record_batch = to_arrow(data)?; - if let Some(mask) = mask { - Ok(filter_record_batch(&record_batch, &mask.into())?) - } else { - Ok(record_batch) - } - }) - .try_collect() + materialize_scan_results(scan.execute(engine)?).try_collect() } diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 1d0c8406b..fff69ffaf 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -3,28 +3,29 @@ //! Data (golden tables) are stored in tests/golden_data/.tar.zst //! Each table directory has a table/ and expected/ subdirectory with the input/output respectively +use std::path::{Path, PathBuf}; +use std::sync::Arc; + use arrow::array::AsArray; -use arrow::{compute::filter_record_batch, record_batch::RecordBatch}; +use arrow::record_batch::RecordBatch; +use arrow_array::{Array, StructArray}; use arrow_ord::sort::{lexsort_to_indices, SortColumn}; +use arrow_schema::DataType; use arrow_schema::{FieldRef, Schema}; use arrow_select::{concat::concat_batches, take::take}; -use itertools::Itertools; -use paste::paste; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Table}; use futures::{stream::TryStreamExt, StreamExt}; +use itertools::Itertools; use object_store::{local::LocalFileSystem, ObjectStore}; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; +use paste::paste; -use arrow_array::{Array, StructArray}; -use arrow_schema::DataType; +use delta_kernel::engine::arrow_compute::materialize_scan_results; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Table}; mod common; -use common::{load_test_data, to_arrow}; +use common::load_test_data; // NB adapated from DAT: read all parquet files in the directory and concatenate them async fn read_expected(path: &Path) -> DeltaResult { @@ -171,19 +172,7 @@ async fn latest_snapshot_test( let snapshot = table.snapshot(&engine, None)?; let scan = snapshot.into_scan_builder().build()?; let scan_res = scan.execute(Arc::new(engine))?; - let batches: Vec = scan_res - .map(|scan_result| -> DeltaResult<_> { - let scan_result = scan_result?; - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let record_batch = to_arrow(data)?; - if let Some(mask) = mask { - Ok(filter_record_batch(&record_batch, &mask.into())?) - } else { - Ok(record_batch) - } - }) - .try_collect()?; + let batches: Vec = materialize_scan_results(scan_res).try_collect()?; let expected = read_expected(&expected_path.expect("expect an expected dir")).await?; diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index ae49b70e2..511217152 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -394,6 +394,7 @@ fn read_with_scan_data( ) .unwrap(); + // TODO(zach): use materialize_scan_results? for read_result in read_results { let read_result = read_result.unwrap(); let len = read_result.len();