Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update ScanCallback to use ScanFile struct #576

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
19 changes: 6 additions & 13 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,26 +349,19 @@ fn row_indexes_from_dv_impl(

// Wrapper function that gets called by the kernel, transforms the arguments to make the ffi-able,
// and then calls the ffi specified callback
fn rust_callback(
context: &mut ContextWrapper,
path: &str,
size: i64,
kernel_stats: Option<delta_kernel::scan::state::Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
fn rust_callback(context: &mut ContextWrapper, file: ScanFile) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work at all. See test failures.

In particular:

  • import ScanFile
  • You can't create a string slice from a String, so you need something like:
    let path = file.path.as_str();
    (context.callback)(
        context.engine_context,
        kernel_string_slice!(path),
        file.size,
        stats.as_ref(),
        &file.dv_info,
        &partition_map,
    );

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense, fixed. Is there to run these tests locally? I took a look at the GH action but seems like a non-trivial amount of setup.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to just run cargo test in the ffi crate locally

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not seeing a fix? I think you might need to push

let partition_map = CStringMap {
values: partition_values,
values: file.partition_values,
};
let stats = kernel_stats.map(|ks| Stats {
let stats = file.stats.map(|ks| Stats {
num_records: ks.num_records,
});
(context.callback)(
context.engine_context,
kernel_string_slice!(path),
size,
kernel_string_slice!(file.path),
file.size,
stats.as_ref(),
&dv_info,
&file.dv_info,
&partition_map,
);
}
Expand Down
19 changes: 7 additions & 12 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fn main() -> ExitCode {
struct ScanFile {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the ScanFile type defined here anymore. Since the kernel now has a similar struct, just use that one.

Same comment for all the other custom ScanFile defs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense, looks pretty clean now imo

path: String,
size: i64,
stats: Option<Stats>,
partition_values: HashMap<String, String>,
dv_info: DvInfo,
}
Expand All @@ -105,19 +106,13 @@ fn truncate_batch(batch: RecordBatch, rows: usize) -> RecordBatch {
}

// This is the callback that will be called fo each valid scan row
fn send_scan_file(
scan_tx: &mut spmc::Sender<ScanFile>,
path: &str,
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
fn send_scan_file(scan_tx: &mut spmc::Sender<ScanFile>, file: delta_kernel::scan::state::ScanFile) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here, no need to unwrap and then re-wrap the struct, just use the kernel one

let scan_file = ScanFile {
path: path.to_string(),
size,
partition_values,
dv_info,
path: file.path.to_string(),
size: file.size,
stats: file.stats,
partition_values: file.partition_values,
dv_info: file.dv_info,
};
scan_tx.send(scan_file).unwrap();
}
Expand Down
28 changes: 11 additions & 17 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,32 +256,26 @@ pub fn scan_action_iter(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use crate::scan::{
state::{DvInfo, Stats},
state::{self, ScanFile},
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback},
};

// dv-info is more complex to validate, we validate that works in the test for visit_scan_files
// in state.rs
fn validate_simple(
_: &mut (),
path: &str,
size: i64,
stats: Option<Stats>,
_: DvInfo,
part_vals: HashMap<String, String>,
) {
fn validate_simple(_: &mut (), file: state::ScanFile<'_>) {
assert_eq!(
path,
file.path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
assert_eq!(file.size, 635);
assert!(file.stats.is_some());
assert_eq!(file.stats.as_ref().unwrap().num_records, 10);
assert_eq!(
file.partition_values.get("date"),
Some(&"2017-12-10".to_string())
);
assert_eq!(file.partition_values.get("non-existent"), None);
}

#[test]
Expand Down
32 changes: 10 additions & 22 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,22 +431,17 @@ impl Scan {
struct ScanFile {
path: String,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
}
fn scan_data_callback(
batches: &mut Vec<ScanFile>,
path: &str,
size: i64,
_: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
fn scan_data_callback(batches: &mut Vec<ScanFile>, file: state::ScanFile<'_>) {
batches.push(ScanFile {
path: path.to_string(),
size,
dv_info,
partition_values,
path: file.path.to_string(),
size: file.size,
stats: file.stats,
dv_info: file.dv_info,
partition_values: file.partition_values,
});
}

Expand Down Expand Up @@ -963,16 +958,9 @@ mod tests {

fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult<Vec<String>> {
let scan_data = scan.scan_data(engine)?;
fn scan_data_callback(
paths: &mut Vec<String>,
path: &str,
_size: i64,
_: Option<Stats>,
dv_info: DvInfo,
_partition_values: HashMap<String, String>,
) {
paths.push(path.to_string());
assert!(dv_info.deletion_vector.is_none());
fn scan_data_callback(paths: &mut Vec<String>, file: state::ScanFile<'_>) {
paths.push(file.path.to_string());
assert!(file.dv_info.deletion_vector.is_none());
}
let mut files = vec![];
for data in scan_data {
Expand Down
56 changes: 28 additions & 28 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,17 @@ impl DvInfo {
}
}

pub type ScanCallback<T> = fn(
context: &mut T,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
);
/// A struct containing all the information needed for a scan file callback
#[derive(Debug, Clone)]
pub struct ScanFile<'a> {
pub path: &'a str,
pub size: i64,
pub stats: Option<Stats>,
pub dv_info: DvInfo,
pub partition_values: HashMap<String, String>,
}

pub type ScanCallback<T> = fn(context: &mut T, file: ScanFile<'_>);

/// Request that the kernel call a callback on each valid file that needs to be read for the
/// scan.
Expand Down Expand Up @@ -195,14 +198,15 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
let dv_info = DvInfo { deletion_vector };
let partition_values =
getters[9].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
(self.callback)(
&mut self.context,

let scan_file = ScanFile {
path,
size,
stats,
dv_info,
partition_values,
)
};
(self.callback)(&mut self.context, scan_file)
}
}
Ok(())
Expand All @@ -215,32 +219,28 @@ mod tests {

use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback};

use super::{DvInfo, Stats};
use super::{DvInfo, ScanFile, Stats};

#[derive(Clone)]
struct TestContext {
id: usize,
}

fn validate_visit(
context: &mut TestContext,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
part_vals: HashMap<String, String>,
) {
fn validate_visit(context: &mut TestContext, file: ScanFile<'_>) {
assert_eq!(
path,
file.path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
assert!(dv_info.deletion_vector.is_some());
let dv = dv_info.deletion_vector.unwrap();
assert_eq!(file.size, 635);
assert!(file.stats.is_some());
assert_eq!(file.stats.as_ref().unwrap().num_records, 10);
assert_eq!(
file.partition_values.get("date"),
Some(&"2017-12-10".to_string())
);
assert_eq!(file.partition_values.get("non-existent"), None);
assert!(file.dv_info.deletion_vector.is_some());
let dv = file.dv_info.deletion_vector.unwrap();
assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1");
assert_eq!(context.id, 2);
}
Expand Down
22 changes: 8 additions & 14 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::state;
use delta_kernel::scan::state::{visit_scan_files, DvInfo};
use delta_kernel::scan::{transform_to_logical, Scan};
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
Expand Down Expand Up @@ -342,19 +343,12 @@ struct ScanFile {
partition_values: HashMap<String, String>,
}

fn scan_data_callback(
batches: &mut Vec<ScanFile>,
path: &str,
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
batches.push(ScanFile {
path: path.to_string(),
size,
dv_info,
partition_values,
fn scan_data_callback(files: &mut Vec<ScanFile>, file: state::ScanFile<'_>) {
files.push(ScanFile {
path: file.path.to_string(),
size: file.size,
dv_info: file.dv_info,
partition_values: file.partition_values,
});
}

Expand Down
Loading