Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part 2: propagate transform in visit_scan_files #612

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static GArrowRecordBatch* add_partition_columns(
char* col = partition_cols->cols[i];
guint pos = cols + i;
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
print_diag(
" Adding partition column '%s' with value '%s' at column %u\n",
col,
Expand Down
7 changes: 4 additions & 3 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void print_partition_info(struct EngineContext* context, const CStringMap* parti
for (uintptr_t i = 0; i < context->partition_cols->len; i++) {
char* col = context->partition_cols->cols[i];
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
if (partition_val) {
print_diag(" partition '%s' here: %s\n", col, partition_val);
free(partition_val);
Expand Down Expand Up @@ -87,14 +87,15 @@ void scan_row_callback(
void do_visit_scan_data(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec)
KernelBoolSlice selection_vec,
const CTransforms* transforms)
{
print_diag("\nScan iterator found some data to read\n Of this data, here is "
"a selection vector\n");
print_selection_vector(" ", &selection_vec);
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, engine_context, scan_row_callback);
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
}
Expand Down
26 changes: 21 additions & 5 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::schema::Schema;
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error};
use delta_kernel::{DeltaResult, Error, ExpressionRef};
use delta_kernel_ffi_macros::handle_descriptor;
use tracing::debug;
use url::Url;
Expand Down Expand Up @@ -211,6 +211,7 @@ pub unsafe extern "C" fn kernel_scan_data_next(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
Expand All @@ -224,15 +225,17 @@ fn kernel_scan_data_next_impl(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
) -> DeltaResult<bool> {
let mut data = data
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec, _transforms)) = data.next().transpose()? {
if let Some((data, sel_vec, transforms)) = data.next().transpose()? {
let bool_slice = KernelBoolSlice::from(sel_vec);
(engine_visitor)(engine_context, data.into(), bool_slice);
let transform_map = CTransforms { transforms };
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map);
Ok(true)
} else {
Ok(false)
Expand Down Expand Up @@ -281,7 +284,7 @@ pub struct CStringMap {
/// # Safety
///
/// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`]
pub unsafe extern "C" fn get_from_map(
pub unsafe extern "C" fn get_from_string_map(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a reasonable name change, but why this PR?
(also -- do we anticipate exposing other map types through FFI in the future?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mostly I just noticed it while possibly having a "transform map" (which we no longer will have), and thought it was a good change. I can move it to another PR, but it does seem to make more sense this way. TBD on other types, but I imagine eventually we'll find something :)

map: &CStringMap,
key: KernelStringSlice,
allocate_fn: AllocateStringFn,
Expand All @@ -293,6 +296,10 @@ pub unsafe extern "C" fn get_from_map(
.and_then(|v| allocate_fn(kernel_string_slice!(v)))
}

pub struct CTransforms {
transforms: Vec<Option<ExpressionRef>>,
}

/// Get a selection vector out of a [`DvInfo`] struct
///
/// # Safety
Expand Down Expand Up @@ -355,6 +362,7 @@ fn rust_callback(
size: i64,
kernel_stats: Option<delta_kernel::scan::state::Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason not to update the callback in this PR as well, so we can pass this on to the engine?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, mostly I was trying to keep it separated. This PR was focused on getting visit_scan_files to pass things along. If I update the callback here I have to update all the c code too so it brings a bunch into this PR and I thought it was better to keep all the c changes to part 4

partition_values: HashMap<String, String>,
) {
let partition_map = CStringMap {
Expand Down Expand Up @@ -388,6 +396,7 @@ struct ContextWrapper {
pub unsafe extern "C" fn visit_scan_data(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransforms,
engine_context: NullableCvoid,
callback: CScanCallback,
) {
Expand All @@ -398,5 +407,12 @@ pub unsafe extern "C" fn visit_scan_data(
callback,
};
// TODO: return ExternResult to caller instead of panicking?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems pretty easy to address this todo? maybe we can go ahead and do that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will require changes in the c code, which I was trying to avoid mostly here. I added a reminder for myself here for when I do the final fixup that requires lots of C changes

visit_scan_files(data, selection_vec, context_wrapper, rust_callback).unwrap();
visit_scan_files(
data,
selection_vec,
&transforms.transforms,
context_wrapper,
rust_callback,
)
.unwrap();
}
7 changes: 5 additions & 2 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::expressions::ColumnName;
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{ColumnNamesAndTypes, DataType};
use delta_kernel::{DeltaResult, Error, Table};
use delta_kernel::{DeltaResult, Error, ExpressionRef, Table};

use std::collections::HashMap;
use std::process::ExitCode;
Expand Down Expand Up @@ -163,6 +163,7 @@ fn print_scan_file(
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let num_record_str = if let Some(s) = stats {
Expand All @@ -176,6 +177,7 @@ fn print_scan_file(
Size (bytes):\t{size}\n \
Num Records:\t{num_record_str}\n \
Has DV?:\t{}\n \
Transform:\t{transform:?}\n \
Part Vals:\t{partition_values:?}",
dv_info.has_vector()
);
Expand Down Expand Up @@ -209,10 +211,11 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector, _transforms) = res?;
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
(),
print_scan_file,
)?;
Expand Down
6 changes: 4 additions & 2 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::transform_to_logical;
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table};

use clap::{Parser, ValueEnum};
use url::Url;
Expand Down Expand Up @@ -111,6 +111,7 @@ fn send_scan_file(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let scan_file = ScanFile {
Expand Down Expand Up @@ -210,10 +211,11 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector, _transforms) = res?;
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
scan_file_tx,
send_scan_file,
)?;
Expand Down
1 change: 1 addition & 0 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ mod tests {
size: i64,
stats: Option<Stats>,
_: DvInfo,
_: Option<ExpressionRef>,
part_vals: HashMap<String, String>,
) {
assert_eq!(
Expand Down
35 changes: 30 additions & 5 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@ pub enum ColumnType {
/// A transform is ultimately a `Struct` expr. This holds the set of expressions that make that struct expr up
type Transform = Vec<TransformExpr>;

/// utility method making it easy to get a transform for a particular row. If the requested row is
/// outside the range of the passed slice returns `None`, otherwise returns the element at the index
/// of the specified row
pub fn get_transform_for_row(
row: usize,
transforms: &[Option<ExpressionRef>],
) -> Option<ExpressionRef> {
transforms.get(row).cloned().flatten()
}

/// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but
/// things like partition columns need to filled in. This enum holds an expression that's part of a
/// `Transform`.
Expand Down Expand Up @@ -463,6 +473,7 @@ impl Scan {
size: i64,
_: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
batches.push(ScanFile {
Expand All @@ -487,9 +498,15 @@ impl Scan {
let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
.map(|res| {
let (data, vec, _transforms) = res?;
let (data, vec, transforms) = res?;
let scan_files = vec![];
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)
state::visit_scan_files(
data.as_ref(),
&vec,
&transforms,
scan_files,
scan_data_callback,
)
})
// Iterator<DeltaResult<Vec<ScanFile>>> to Iterator<DeltaResult<ScanFile>>
.flatten_ok();
Expand Down Expand Up @@ -816,11 +833,12 @@ pub(crate) mod test_utils {
);
let mut batch_count = 0;
for res in iter {
let (batch, sel, _transforms) = res.unwrap();
let (batch, sel, transforms) = res.unwrap();
assert_eq!(sel, expected_sel_vec);
crate::scan::state::visit_scan_files(
batch.as_ref(),
&sel,
&transforms,
context.clone(),
validate_callback,
)
Expand Down Expand Up @@ -1020,15 +1038,22 @@ mod tests {
_size: i64,
_: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
_partition_values: HashMap<String, String>,
) {
paths.push(path.to_string());
assert!(dv_info.deletion_vector.is_none());
}
let mut files = vec![];
for data in scan_data {
let (data, vec, _transforms) = data?;
files = state::visit_scan_files(data.as_ref(), &vec, files, scan_data_callback)?;
let (data, vec, transforms) = data?;
files = state::visit_scan_files(
data.as_ref(),
&vec,
&transforms,
files,
scan_data_callback,
)?;
}
Ok(files)
}
Expand Down
10 changes: 10 additions & 0 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::collections::HashMap;
use std::sync::LazyLock;

use crate::actions::deletion_vector::deletion_treemap_to_bools;
use crate::scan::get_transform_for_row;
use crate::utils::require;
use crate::ExpressionRef;
use crate::{
actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at},
engine_data::{GetData, RowVisitor, TypedGetData as _},
Expand Down Expand Up @@ -104,6 +106,7 @@ pub type ScanCallback<T> = fn(
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
);

Expand Down Expand Up @@ -138,12 +141,14 @@ pub type ScanCallback<T> = fn(
pub fn visit_scan_files<T>(
data: &dyn EngineData,
selection_vector: &[bool],
transforms: &[Option<ExpressionRef>],
context: T,
callback: ScanCallback<T>,
) -> DeltaResult<T> {
let mut visitor = ScanFileVisitor {
callback,
selection_vector,
transforms,
context,
};
visitor.visit_rows_of(data)?;
Expand All @@ -154,6 +159,7 @@ pub fn visit_scan_files<T>(
struct ScanFileVisitor<'a, T> {
callback: ScanCallback<T>,
selection_vector: &'a [bool],
transforms: &'a [Option<ExpressionRef>],
context: T,
}
impl<T> RowVisitor for ScanFileVisitor<'_, T> {
Expand Down Expand Up @@ -201,6 +207,7 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
size,
stats,
dv_info,
get_transform_for_row(row_index, self.transforms),
partition_values,
)
}
Expand All @@ -214,6 +221,7 @@ mod tests {
use std::collections::HashMap;

use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback};
use crate::ExpressionRef;

use super::{DvInfo, Stats};

Expand All @@ -228,6 +236,7 @@ mod tests {
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
part_vals: HashMap<String, String>,
) {
assert_eq!(
Expand All @@ -242,6 +251,7 @@ mod tests {
assert!(dv_info.deletion_vector.is_some());
let dv = dv_info.deletion_vector.unwrap();
assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1");
assert!(transform.is_none());
assert_eq!(context.id, 2);
}

Expand Down
13 changes: 10 additions & 3 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use delta_kernel::actions::deletion_vector::split_vector;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression};
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression, ExpressionRef};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::{transform_to_logical, Scan};
use delta_kernel::schema::{DataType, Schema};
Expand Down Expand Up @@ -348,6 +348,7 @@ fn scan_data_callback(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transforms: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
batches.push(ScanFile {
Expand All @@ -369,8 +370,14 @@ fn read_with_scan_data(
let scan_data = scan.scan_data(engine)?;
let mut scan_files = vec![];
for data in scan_data {
let (data, vec, _transforms) = data?;
scan_files = visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?;
let (data, vec, transforms) = data?;
scan_files = visit_scan_files(
data.as_ref(),
&vec,
&transforms,
scan_files,
scan_data_callback,
)?;
}

let mut batches = vec![];
Expand Down
Loading