Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update ScanCallback to use ScanFile struct #576

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
19 changes: 6 additions & 13 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,26 +349,19 @@ fn row_indexes_from_dv_impl(

// Wrapper function that gets called by the kernel, transforms the arguments to make the ffi-able,
// and then calls the ffi specified callback
fn rust_callback(
context: &mut ContextWrapper,
path: &str,
size: i64,
kernel_stats: Option<delta_kernel::scan::state::Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
fn rust_callback(context: &mut ContextWrapper, file: ScanFile) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work at all. See test failures.

In particular:

  • import ScanFile
  • You can't create a string slice from a String, so you need something like:
    let path = file.path.as_str();
    (context.callback)(
        context.engine_context,
        kernel_string_slice!(path),
        file.size,
        stats.as_ref(),
        &file.dv_info,
        &partition_map,
    );

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense, fixed. Is there to run these tests locally? I took a look at the GH action but seems like a non-trivial amount of setup.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to just run cargo test in the ffi crate locally

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not seeing a fix? I think you might need to push

let partition_map = CStringMap {
values: partition_values,
values: file.partition_values,
};
let stats = kernel_stats.map(|ks| Stats {
let stats = file.stats.map(|ks| Stats {
num_records: ks.num_records,
});
(context.callback)(
context.engine_context,
kernel_string_slice!(path),
size,
kernel_string_slice!(file.path),
file.size,
stats.as_ref(),
&dv_info,
&file.dv_info,
&partition_map,
);
}
Expand Down
30 changes: 4 additions & 26 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::state::{DvInfo, GlobalScanState, ScanFile, Stats};
use delta_kernel::scan::transform_to_logical;
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
Expand Down Expand Up @@ -76,15 +76,6 @@ fn main() -> ExitCode {
}
}

// the way we as a connector represent data to scan. this is computed from the raw data returned
// from the scan, and could be any format the engine chooses to use to facilitate distributing work.
struct ScanFile {
path: String,
size: i64,
partition_values: HashMap<String, String>,
dv_info: DvInfo,
}

// we know we're using arrow under the hood, so cast an EngineData into something we can work with
fn to_arrow(data: Box<dyn EngineData>) -> DeltaResult<RecordBatch> {
Ok(data
Expand All @@ -104,22 +95,9 @@ fn truncate_batch(batch: RecordBatch, rows: usize) -> RecordBatch {
RecordBatch::try_new(batch.schema(), cols).unwrap()
}

// This is the callback that will be called fo each valid scan row
fn send_scan_file(
scan_tx: &mut spmc::Sender<ScanFile>,
path: &str,
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
let scan_file = ScanFile {
path: path.to_string(),
size,
partition_values,
dv_info,
};
scan_tx.send(scan_file).unwrap();
// This is the callback that will be called for each valid scan row
fn send_scan_file(scan_tx: &mut spmc::Sender<ScanFile>, file: ScanFile) {
scan_tx.send(file).unwrap();
}

fn try_main() -> DeltaResult<()> {
Expand Down
28 changes: 11 additions & 17 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,32 +256,26 @@ pub fn scan_action_iter(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use crate::scan::{
state::{DvInfo, Stats},
state,
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback},
};

// dv-info is more complex to validate, we validate that works in the test for visit_scan_files
// in state.rs
fn validate_simple(
_: &mut (),
path: &str,
size: i64,
stats: Option<Stats>,
_: DvInfo,
part_vals: HashMap<String, String>,
) {
fn validate_simple(_: &mut (), file: state::ScanFile) {
assert_eq!(
path,
file.path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
assert_eq!(file.size, 635);
assert!(file.stats.is_some());
assert_eq!(file.stats.as_ref().unwrap().num_records, 10);
assert_eq!(
file.partition_values.get("date"),
Some(&"2017-12-10".to_string())
);
assert_eq!(file.partition_values.get("non-existent"), None);
}

#[test]
Expand Down
38 changes: 5 additions & 33 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::actions::deletion_vector::{
};
use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::expressions::{ColumnName, Expression, ExpressionRef, ExpressionTransform, Scalar};
use crate::scan::state::{DvInfo, Stats};
use crate::schema::{
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField,
StructType,
Expand Down Expand Up @@ -428,33 +427,13 @@ impl Scan {
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + '_> {
struct ScanFile {
path: String,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
fn scan_data_callback(batches: &mut Vec<state::ScanFile>, file: state::ScanFile) {
batches.push(file);
}
fn scan_data_callback(
batches: &mut Vec<ScanFile>,
path: &str,
size: i64,
_: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
batches.push(ScanFile {
path: path.to_string(),
size,
dv_info,
partition_values,
});
}

debug!(
"Executing scan with logical schema {:#?} and physical schema {:#?}",
self.logical_schema, self.physical_schema
);

let global_state = Arc::new(self.global_scan_state());
let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
Expand Down Expand Up @@ -963,16 +942,9 @@ mod tests {

fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult<Vec<String>> {
let scan_data = scan.scan_data(engine)?;
fn scan_data_callback(
paths: &mut Vec<String>,
path: &str,
_size: i64,
_: Option<Stats>,
dv_info: DvInfo,
_partition_values: HashMap<String, String>,
) {
paths.push(path.to_string());
assert!(dv_info.deletion_vector.is_none());
fn scan_data_callback(paths: &mut Vec<String>, file: state::ScanFile) {
paths.push(file.path);
assert!(file.dv_info.deletion_vector.is_none());
}
let mut files = vec![];
for data in scan_data {
Expand Down
63 changes: 31 additions & 32 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,17 @@ impl DvInfo {
}
}

pub type ScanCallback<T> = fn(
context: &mut T,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
);
/// A struct containing all the information needed for a scan file callback
#[derive(Debug, Clone)]
pub struct ScanFile {
pub path: String,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went down the lifetime rabbithole to see if i could keep it as &str and it ended up being really complicated and doubtful it makes a difference perf wise.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we were copying it anyway before, just one layer lower

pub size: i64,
pub stats: Option<Stats>,
pub dv_info: DvInfo,
pub partition_values: HashMap<String, String>,
}

pub type ScanCallback<T> = fn(context: &mut T, file: ScanFile);

/// Request that the kernel call a callback on each valid file that needs to be read for the
/// scan.
Expand Down Expand Up @@ -176,7 +179,8 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
continue;
}
// Since path column is required, use it to detect presence of an Add action
if let Some(path) = getters[0].get_opt(row_index, "scanFile.path")? {
let path: Option<String> = getters[0].get_opt(row_index, "scanFile.path")?;
if let Some(path) = path {
let size = getters[1].get(row_index, "scanFile.size")?;
let stats: Option<String> = getters[3].get_opt(row_index, "scanFile.stats")?;
let stats: Option<Stats> =
Expand All @@ -195,14 +199,15 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
let dv_info = DvInfo { deletion_vector };
let partition_values =
getters[9].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
(self.callback)(
&mut self.context,
path,

let scan_file = ScanFile {
path: path.to_string(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already a string

Suggested change
path: path.to_string(),
path,

size,
stats,
dv_info,
partition_values,
)
};
(self.callback)(&mut self.context, scan_file)
}
}
Ok(())
Expand All @@ -211,36 +216,30 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback};

use super::{DvInfo, Stats};
use super::ScanFile;

#[derive(Clone)]
struct TestContext {
id: usize,
}

fn validate_visit(
context: &mut TestContext,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
part_vals: HashMap<String, String>,
) {
fn validate_visit(context: &mut TestContext, file: ScanFile) {
assert_eq!(
path,
file.path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
assert!(dv_info.deletion_vector.is_some());
let dv = dv_info.deletion_vector.unwrap();
assert_eq!(file.size, 635);
assert!(file.stats.is_some());
assert_eq!(file.stats.as_ref().unwrap().num_records, 10);
assert_eq!(
file.partition_values.get("date"),
Some(&"2017-12-10".to_string())
);
assert_eq!(file.partition_values.get("non-existent"), None);
assert!(file.dv_info.deletion_vector.is_some());
let dv = file.dv_info.deletion_vector.unwrap();
assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1");
assert_eq!(context.id, 2);
}
Expand Down
27 changes: 4 additions & 23 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::ops::Not;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -11,7 +10,8 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::state;
use delta_kernel::scan::state::visit_scan_files;
use delta_kernel::scan::{transform_to_logical, Scan};
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
Expand Down Expand Up @@ -335,27 +335,8 @@ fn read_with_execute(
Ok(())
}

struct ScanFile {
path: String,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
}

fn scan_data_callback(
batches: &mut Vec<ScanFile>,
path: &str,
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
batches.push(ScanFile {
path: path.to_string(),
size,
dv_info,
partition_values,
});
fn scan_data_callback(files: &mut Vec<state::ScanFile>, file: state::ScanFile) {
files.push(file);
}

fn read_with_scan_data(
Expand Down
Loading