Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part 3 of expression based transform: Use computed transform #613

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static GArrowRecordBatch* add_partition_columns(
char* col = partition_cols->cols[i];
guint pos = cols + i;
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
print_diag(
" Adding partition column '%s' with value '%s' at column %u\n",
col,
Expand Down
7 changes: 4 additions & 3 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void print_partition_info(struct EngineContext* context, const CStringMap* parti
for (uintptr_t i = 0; i < context->partition_cols->len; i++) {
char* col = context->partition_cols->cols[i];
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
if (partition_val) {
print_diag(" partition '%s' here: %s\n", col, partition_val);
free(partition_val);
Expand Down Expand Up @@ -87,14 +87,15 @@ void scan_row_callback(
void do_visit_scan_data(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec)
KernelBoolSlice selection_vec,
const CTransforms* transforms)
{
print_diag("\nScan iterator found some data to read\n Of this data, here is "
"a selection vector\n");
print_selection_vector(" ", &selection_vec);
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, engine_context, scan_row_callback);
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
}
Expand Down
26 changes: 21 additions & 5 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::schema::Schema;
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error};
use delta_kernel::{DeltaResult, Error, ExpressionRef};
use delta_kernel_ffi_macros::handle_descriptor;
use tracing::debug;
use url::Url;
Expand Down Expand Up @@ -211,6 +211,7 @@ pub unsafe extern "C" fn kernel_scan_data_next(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
Expand All @@ -224,15 +225,17 @@ fn kernel_scan_data_next_impl(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
) -> DeltaResult<bool> {
let mut data = data
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec, _transforms)) = data.next().transpose()? {
if let Some((data, sel_vec, transforms)) = data.next().transpose()? {
let bool_slice = KernelBoolSlice::from(sel_vec);
(engine_visitor)(engine_context, data.into(), bool_slice);
let transform_map = CTransforms { transforms };
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map);
Ok(true)
} else {
Ok(false)
Expand Down Expand Up @@ -281,7 +284,7 @@ pub struct CStringMap {
/// # Safety
///
/// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`]
pub unsafe extern "C" fn get_from_map(
pub unsafe extern "C" fn get_from_string_map(
map: &CStringMap,
key: KernelStringSlice,
allocate_fn: AllocateStringFn,
Expand All @@ -293,6 +296,10 @@ pub unsafe extern "C" fn get_from_map(
.and_then(|v| allocate_fn(kernel_string_slice!(v)))
}

pub struct CTransforms {
transforms: Vec<Option<ExpressionRef>>,
}

/// Get a selection vector out of a [`DvInfo`] struct
///
/// # Safety
Expand Down Expand Up @@ -355,6 +362,7 @@ fn rust_callback(
size: i64,
kernel_stats: Option<delta_kernel::scan::state::Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let partition_map = CStringMap {
Expand Down Expand Up @@ -388,6 +396,7 @@ struct ContextWrapper {
pub unsafe extern "C" fn visit_scan_data(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransforms,
engine_context: NullableCvoid,
callback: CScanCallback,
) {
Expand All @@ -398,5 +407,12 @@ pub unsafe extern "C" fn visit_scan_data(
callback,
};
// TODO: return ExternResult to caller instead of panicking?
visit_scan_files(data, selection_vec, context_wrapper, rust_callback).unwrap();
visit_scan_files(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this formatting that should have been applied in an earlier PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was applied earlier, it's included in part-2. not quite sure why it's showing up here but it'll go away once earlier PRs merge

data,
selection_vec,
&transforms.transforms,
context_wrapper,
rust_callback,
)
.unwrap();
}
7 changes: 5 additions & 2 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::expressions::ColumnName;
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{ColumnNamesAndTypes, DataType};
use delta_kernel::{DeltaResult, Error, Table};
use delta_kernel::{DeltaResult, Error, ExpressionRef, Table};

use std::collections::HashMap;
use std::process::ExitCode;
Expand Down Expand Up @@ -163,6 +163,7 @@ fn print_scan_file(
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let num_record_str = if let Some(s) = stats {
Expand All @@ -176,6 +177,7 @@ fn print_scan_file(
Size (bytes):\t{size}\n \
Num Records:\t{num_record_str}\n \
Has DV?:\t{}\n \
Transform:\t{transform:?}\n \
Part Vals:\t{partition_values:?}",
dv_info.has_vector()
);
Expand Down Expand Up @@ -209,10 +211,11 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector, _transforms) = res?;
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
(),
print_scan_file,
)?;
Expand Down
26 changes: 13 additions & 13 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::transform_to_logical;
use delta_kernel::scan::state::{transform_to_logical, DvInfo, GlobalScanState, Stats};
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table};

use clap::{Parser, ValueEnum};
use url::Url;
Expand Down Expand Up @@ -81,7 +80,7 @@ fn main() -> ExitCode {
struct ScanFile {
path: String,
size: i64,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
dv_info: DvInfo,
}

Expand Down Expand Up @@ -111,12 +110,13 @@ fn send_scan_file(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
_: HashMap<String, String>,
) {
let scan_file = ScanFile {
path: path.to_string(),
size,
partition_values,
transform,
dv_info,
};
scan_tx.send(scan_file).unwrap();
Expand Down Expand Up @@ -210,10 +210,11 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector, _transforms) = res?;
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
scan_file_tx,
send_scan_file,
)?;
Expand Down Expand Up @@ -256,7 +257,6 @@ fn do_work(
) {
// get the type for the function calls
let engine: &dyn Engine = engine.as_ref();
let physical_schema = scan_state.physical_schema.clone();
// in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side
// hangs up, which indicates there's no more data to process.
while let Ok(scan_file) = scan_file_rx.recv() {
Expand Down Expand Up @@ -287,19 +287,19 @@ fn do_work(
// vector
let read_results = engine
.get_parquet_handler()
.read_parquet_files(&[meta], physical_schema.clone(), None)
.read_parquet_files(&[meta], scan_state.physical_schema.clone(), None)
.unwrap();

for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();

// ask the kernel to transform the physical data into the correct logical form
// transform the physical data into the correct logical form
let logical = transform_to_logical(
engine,
read_result,
&scan_state,
&scan_file.partition_values,
&scan_state.physical_schema,
&scan_state.logical_schema,
&scan_file.transform,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ mod tests {
size: i64,
stats: Option<Stats>,
_: DvInfo,
_: Option<ExpressionRef>,
part_vals: HashMap<String, String>,
) {
assert_eq!(
Expand Down
Loading
Loading