-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[wip] feat: add materialize_scan_results
arrow utility
#621
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,28 @@ | ||||||||||||||||||
use crate::engine::arrow_data::ArrowEngineData; | ||||||||||||||||||
use crate::error::DeltaResult; | ||||||||||||||||||
use crate::scan::ScanResult; | ||||||||||||||||||
|
||||||||||||||||||
use arrow::compute::filter_record_batch; | ||||||||||||||||||
use arrow_array::RecordBatch; | ||||||||||||||||||
|
||||||||||||||||||
/// Utility function that transforms an iterator of `ScanResult`s into an iterator of | ||||||||||||||||||
/// `RecordBatch`s containing the actual scan data by applying the scan result's mask to the data. | ||||||||||||||||||
/// | ||||||||||||||||||
/// It uses arrow-compute's `filter_record_batch` function to apply the mask to the data. | ||||||||||||||||||
pub fn materialize_scan_results( | ||||||||||||||||||
scan_results: impl Iterator<Item = DeltaResult<ScanResult>>, | ||||||||||||||||||
) -> impl Iterator<Item = DeltaResult<RecordBatch>> { | ||||||||||||||||||
scan_results.map(|res| { | ||||||||||||||||||
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); | ||||||||||||||||||
let (mask, data) = scan_res?; | ||||||||||||||||||
Comment on lines
+16
to
+17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
let record_batch: RecordBatch = data | ||||||||||||||||||
.into_any() | ||||||||||||||||||
.downcast::<ArrowEngineData>() | ||||||||||||||||||
.map_err(|_| crate::Error::EngineDataType("ArrowEngineData".to_string()))? | ||||||||||||||||||
.into(); | ||||||||||||||||||
Ok(match mask { | ||||||||||||||||||
Some(mask) => filter_record_batch(&record_batch, &mask.into())?, | ||||||||||||||||||
None => record_batch, | ||||||||||||||||||
}) | ||||||||||||||||||
Comment on lines
+23
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is updating a
Suggested change
|
||||||||||||||||||
}) | ||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,15 @@ | ||
use std::{error, sync::Arc}; | ||
|
||
use arrow::compute::filter_record_batch; | ||
use arrow_array::RecordBatch; | ||
use delta_kernel::engine::sync::SyncEngine; | ||
use itertools::Itertools; | ||
|
||
use delta_kernel::engine::arrow_compute::materialize_scan_results; | ||
use delta_kernel::engine::arrow_data::ArrowEngineData; | ||
use delta_kernel::{DeltaResult, Table, Version}; | ||
|
||
mod common; | ||
use common::{load_test_data, to_arrow}; | ||
use common::load_test_data; | ||
|
||
fn read_cdf_for_table( | ||
test_name: impl AsRef<str>, | ||
|
@@ -35,19 +35,8 @@ fn read_cdf_for_table( | |
.into_scan_builder() | ||
.with_schema(schema) | ||
.build()?; | ||
let batches: Vec<RecordBatch> = scan | ||
.execute(engine)? | ||
.map(|scan_result| -> DeltaResult<_> { | ||
let scan_result = scan_result?; | ||
let mask = scan_result.full_mask(); | ||
let data = scan_result.raw_data?; | ||
let record_batch = to_arrow(data)?; | ||
match mask { | ||
Some(mask) => Ok(filter_record_batch(&record_batch, &mask.into())?), | ||
None => Ok(record_batch), | ||
} | ||
}) | ||
.try_collect()?; | ||
let batches: Vec<RecordBatch> = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Type annotation not needed, (probably a bunch of other call sites can also simplify -- even e.g. arrow |
||
materialize_scan_results(scan.execute(engine)?).try_collect()?; | ||
Ok(batches) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would work better to define a
materialize_scan_result
function that takesDeltaResult<ScanResult>
and returnsDeltaResult<RecordBatch>
? It would be a simpler type signature, and would also allow a more natural use at the call site:(more readable IMO even tho it technically has more lines of code)