Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] feat: add materialize_scan_results arrow utility #621

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;
let mut schema = None;
// TODO replace with new util
let batches: Vec<RecordBatch> = scan
.execute(engine)?
.map(|scan_result| -> DeltaResult<_> {
Expand Down
3 changes: 3 additions & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ parquet = { workspace = true, optional = true }
# Used for fetching direct urls (like pre-signed urls)
reqwest = { version = "0.12.7", optional = true }
strum = { version = "0.26", features = ["derive"] }
# used in arrow-compute feature for record batch filtering utility
arrow = { workspace = true, optional = true }


# optionally used with default engine (though not required)
Expand All @@ -66,6 +68,7 @@ hdfs-native = { workspace = true, optional = true }
walkdir = { workspace = true, optional = true }

[features]
arrow-compute = ["arrow", "arrow-conversion"]
arrow-conversion = ["arrow-schema"]
arrow-expression = ["arrow-arith", "arrow-array", "arrow-buffer", "arrow-ord", "arrow-schema"]
cloud = [
Expand Down
24 changes: 4 additions & 20 deletions kernel/examples/read-table-changes/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{collections::HashMap, sync::Arc};

use arrow::{compute::filter_record_batch, util::pretty::print_batches};
use arrow::util::pretty::print_batches;
use arrow_array::RecordBatch;
use clap::Parser;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::arrow_compute::materialize_scan_results;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::{DeltaResult, Table};
Expand Down Expand Up @@ -35,24 +35,8 @@ fn main() -> DeltaResult<()> {
let table_changes = table.table_changes(engine.as_ref(), cli.start_version, cli.end_version)?;

let table_changes_scan = table_changes.into_scan_builder().build()?;
let batches: Vec<RecordBatch> = table_changes_scan
.execute(engine.clone())?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
}
})
.try_collect()?;
let batches: Vec<RecordBatch> =
materialize_scan_results(table_changes_scan.execute(engine.clone())?).try_collect()?;
Comment on lines +38 to +39
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would work better to define a materialize_scan_result function that takes DeltaResult<ScanResult> and returns DeltaResult<RecordBatch>? It would be a simpler type signature, and would also allow a more natural use at the call site:

Suggested change
let batches: Vec<RecordBatch> =
materialize_scan_results(table_changes_scan.execute(engine.clone())?).try_collect()?;
let batches: Vec<_> = table_changes_scan
.execute(engine.clone())?
.map(materialize_scan_result)
.try_collect()?;

(more readable IMO even tho it technically has more lines of code)

print_batches(&batches)?;
Ok(())
}
1 change: 1 addition & 0 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ fn do_work(
.read_parquet_files(&[meta], physical_schema.clone(), None)
.unwrap();

// replace with materialize_scan_results?
for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();
Expand Down
23 changes: 3 additions & 20 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::collections::HashMap;
use std::process::ExitCode;
use std::sync::Arc;

use arrow::compute::filter_record_batch;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::arrow_compute::materialize_scan_results;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::sync::SyncEngine;
Expand Down Expand Up @@ -119,24 +118,8 @@ fn try_main() -> DeltaResult<()> {
.with_schema_opt(read_schema_opt)
.build()?;

let batches: Vec<RecordBatch> = scan
.execute(engine)?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
}
})
.try_collect()?;
let batches: Vec<RecordBatch> =
materialize_scan_results(scan.execute(engine)?).try_collect()?;
print_batches(&batches)?;
Ok(())
}
28 changes: 28 additions & 0 deletions kernel/src/engine/arrow_compute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::engine::arrow_data::ArrowEngineData;
use crate::error::DeltaResult;
use crate::scan::ScanResult;

use arrow::compute::filter_record_batch;
use arrow_array::RecordBatch;

/// Utility function that transforms an iterator of `ScanResult`s into an iterator of
/// `RecordBatch`s containing the actual scan data by applying the scan result's mask to the data.
///
/// It uses arrow-compute's `filter_record_batch` function to apply the mask to the data.
pub fn materialize_scan_results(
scan_results: impl Iterator<Item = DeltaResult<ScanResult>>,
) -> impl Iterator<Item = DeltaResult<RecordBatch>> {
scan_results.map(|res| {
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) = scan_res?;
Comment on lines +16 to +17
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) = scan_res?;
let res = res?
let (mask, data) = (res.full_mask(), res.raw_data?);

let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| crate::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
Ok(match mask {
Some(mask) => filter_record_batch(&record_batch, &mask.into())?,
None => record_batch,
})
Comment on lines +23 to +26
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is updating a mut record_batch potentially simpler?

Suggested change
Ok(match mask {
Some(mask) => filter_record_batch(&record_batch, &mask.into())?,
None => record_batch,
})
if let Some(mask) = mask {
record_batch = filter_record_batch(&record_batch, &mask.into())?;
}
Ok(record_batch)

})
}
3 changes: 3 additions & 0 deletions kernel/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ declare_modules!(
(pub(crate), arrow_utils),
(pub(crate), ensure_data_types)
);

#[cfg(feature = "arrow-compute")]
pub mod arrow_compute;
19 changes: 4 additions & 15 deletions kernel/tests/cdf.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{error, sync::Arc};

use arrow::compute::filter_record_batch;
use arrow_array::RecordBatch;
use delta_kernel::engine::sync::SyncEngine;
use itertools::Itertools;

use delta_kernel::engine::arrow_compute::materialize_scan_results;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::{DeltaResult, Table, Version};

mod common;
use common::{load_test_data, to_arrow};
use common::load_test_data;

fn read_cdf_for_table(
test_name: impl AsRef<str>,
Expand All @@ -35,19 +35,8 @@ fn read_cdf_for_table(
.into_scan_builder()
.with_schema(schema)
.build()?;
let batches: Vec<RecordBatch> = scan
.execute(engine)?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch = to_arrow(data)?;
match mask {
Some(mask) => Ok(filter_record_batch(&record_batch, &mask.into())?),
None => Ok(record_batch),
}
})
.try_collect()?;
let batches: Vec<RecordBatch> =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Type annotation not needed, read_cdf_for_table return value fully constrains it.

(probably a bunch of other call sites can also simplify -- even e.g. arrow print_batches above, that takes a slice, still constrains the inner type so we can annotate as Vec<_>)

materialize_scan_results(scan.execute(engine)?).try_collect()?;
Ok(batches)
}

Expand Down
17 changes: 2 additions & 15 deletions kernel/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use arrow::compute::filter_record_batch;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use itertools::Itertools;

use crate::ArrowEngineData;
use delta_kernel::scan::Scan;
use delta_kernel::{DeltaResult, Engine, EngineData, Table};
use delta_kernel::engine::arrow_compute::materialize_scan_results;

use std::sync::Arc;

Expand Down Expand Up @@ -87,18 +87,5 @@ pub(crate) fn test_read(
// TODO (zach): this is listed as unused for acceptance crate
#[allow(unused)]
pub(crate) fn read_scan(scan: &Scan, engine: Arc<dyn Engine>) -> DeltaResult<Vec<RecordBatch>> {
let scan_results = scan.execute(engine)?;
scan_results
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch = to_arrow(data)?;
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
}
})
.try_collect()
materialize_scan_results(scan.execute(engine)?).try_collect()
}
35 changes: 12 additions & 23 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,29 @@
//! Data (golden tables) are stored in tests/golden_data/<table_name>.tar.zst
//! Each table directory has a table/ and expected/ subdirectory with the input/output respectively

use std::path::{Path, PathBuf};
use std::sync::Arc;

use arrow::array::AsArray;
use arrow::{compute::filter_record_batch, record_batch::RecordBatch};
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, StructArray};
use arrow_ord::sort::{lexsort_to_indices, SortColumn};
use arrow_schema::DataType;
use arrow_schema::{FieldRef, Schema};
use arrow_select::{concat::concat_batches, take::take};
use itertools::Itertools;
use paste::paste;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Table};
use futures::{stream::TryStreamExt, StreamExt};
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use paste::paste;

use arrow_array::{Array, StructArray};
use arrow_schema::DataType;
use delta_kernel::engine::arrow_compute::materialize_scan_results;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Table};

mod common;
use common::{load_test_data, to_arrow};
use common::load_test_data;

// NB adapated from DAT: read all parquet files in the directory and concatenate them
async fn read_expected(path: &Path) -> DeltaResult<RecordBatch> {
Expand Down Expand Up @@ -171,19 +172,7 @@ async fn latest_snapshot_test(
let snapshot = table.snapshot(&engine, None)?;
let scan = snapshot.into_scan_builder().build()?;
let scan_res = scan.execute(Arc::new(engine))?;
let batches: Vec<RecordBatch> = scan_res
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch = to_arrow(data)?;
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
}
})
.try_collect()?;
let batches: Vec<RecordBatch> = materialize_scan_results(scan_res).try_collect()?;

let expected = read_expected(&expected_path.expect("expect an expected dir")).await?;

Expand Down
1 change: 1 addition & 0 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ fn read_with_scan_data(
)
.unwrap();

// TODO(zach): use materialize_scan_results?
for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();
Expand Down
Loading