Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part 4: read_table.c uses transform in ffi #614

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6e0b1c9
also extract partitionValues
nicklan Dec 9, 2024
c53b7de
checkpoint
nicklan Dec 11, 2024
9ac7173
Merge branch 'main' into transform-expr
nicklan Dec 12, 2024
f75b2e3
hey, it kinda works
nicklan Dec 12, 2024
4d1d4f7
Merge branch 'main' into transform-expr
nicklan Dec 17, 2024
c8cc84b
undo change to ColumnType, will go a different direction
nicklan Dec 17, 2024
29ded0e
use TransformExpr
nicklan Dec 18, 2024
9d4688c
cleanup
nicklan Dec 18, 2024
631f403
Merge branch 'main' into transform-expr
nicklan Dec 18, 2024
f791167
optional transform
nicklan Dec 18, 2024
b7268e5
add initial tests
nicklan Dec 18, 2024
da5a9e8
adjust comments
nicklan Dec 18, 2024
e3fdfaa
fix comment
nicklan Dec 18, 2024
e9a8d1c
oops, fix ffi
nicklan Dec 19, 2024
b773614
cleanup examples
nicklan Dec 19, 2024
ebcb42d
Actually use ExpressionRef
nicklan Dec 19, 2024
3a38785
Merge branch 'main' into transform-expr
nicklan Dec 19, 2024
58ad2a3
remove unused try_from
nicklan Dec 19, 2024
3d040f7
need transform if column mapping is enabled
nicklan Dec 19, 2024
d39322c
checkpoint
nicklan Dec 19, 2024
3cb4746
pass through transforms in kernel
nicklan Dec 19, 2024
d23c4d0
pass through in ffi (missing final bit)
nicklan Dec 19, 2024
ff5d5fe
fmt cleanup
nicklan Dec 19, 2024
fad4b45
use transform in execute
nicklan Dec 19, 2024
d0e7d9b
multi-threaded reader uses transform
nicklan Dec 19, 2024
a740ffc
read.rs uses transform
nicklan Dec 19, 2024
6d72b75
remove transform_to_logical :)
nicklan Dec 19, 2024
b961220
use transform in read_table
nicklan Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 35 additions & 86 deletions ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ArrowContext* init_arrow_context()
context->num_batches = 0;
context->batches = NULL;
context->cur_filter = NULL;
context->cur_transform = NULL;
return context;
}

Expand Down Expand Up @@ -50,86 +51,10 @@ static GArrowRecordBatch* get_record_batch(FFI_ArrowArray* array, GArrowSchema*
return record_batch;
}

// Add columns to a record batch for each partition. In a "real" engine we would want to parse the
// string values into the correct data type. This program just adds all partition columns as strings
// for simplicity
static GArrowRecordBatch* add_partition_columns(
GArrowRecordBatch* record_batch,
PartitionList* partition_cols,
const CStringMap* partition_values)
{
gint64 rows = garrow_record_batch_get_n_rows(record_batch);
gint64 cols = garrow_record_batch_get_n_columns(record_batch);
GArrowRecordBatch* cur_record_batch = record_batch;
GError* error = NULL;
for (uintptr_t i = 0; i < partition_cols->len; i++) {
char* col = partition_cols->cols[i];
guint pos = cols + i;
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
print_diag(
" Adding partition column '%s' with value '%s' at column %u\n",
col,
partition_val ? partition_val : "NULL",
pos);
GArrowStringArrayBuilder* builder = garrow_string_array_builder_new();
for (gint64 i = 0; i < rows; i++) {
if (partition_val) {
garrow_string_array_builder_append_string(builder, partition_val, &error);
} else {
garrow_array_builder_append_null((GArrowArrayBuilder*)builder, &error);
}
if (report_g_error("Can't append to partition column builder", error)) {
break;
}
}

if (partition_val) {
free(partition_val);
}

if (error != NULL) {
printf("Giving up on column %s\n", col);
g_error_free(error);
g_object_unref(builder);
error = NULL;
continue;
}

GArrowArray* partition_col = garrow_array_builder_finish((GArrowArrayBuilder*)builder, &error);
if (report_g_error("Can't build string array for parition column", error)) {
printf("Giving up on column %s\n", col);
g_error_free(error);
g_object_unref(builder);
error = NULL;
continue;
}
g_object_unref(builder);

GArrowDataType* string_data_type = (GArrowDataType*)garrow_string_data_type_new();
GArrowField* field = garrow_field_new(col, string_data_type);
GArrowRecordBatch* old_batch = cur_record_batch;
cur_record_batch = garrow_record_batch_add_column(old_batch, pos, field, partition_col, &error);
g_object_unref(old_batch);
g_object_unref(partition_col);
g_object_unref(string_data_type);
g_object_unref(field);
if (cur_record_batch == NULL) {
if (error != NULL) {
printf("Could not add column at %u: %s\n", pos, error->message);
g_error_free(error);
}
}
}
return cur_record_batch;
}

// append a batch to our context
static void add_batch_to_context(
ArrowContext* context,
ArrowFFIData* arrow_data,
PartitionList* partition_cols,
const CStringMap* partition_values)
ArrowFFIData* arrow_data)
{
GArrowSchema* schema = get_schema(&arrow_data->schema);
GArrowRecordBatch* record_batch = get_record_batch(&arrow_data->array, schema);
Expand All @@ -142,11 +67,6 @@ static void add_batch_to_context(
g_object_unref(context->cur_filter);
context->cur_filter = NULL;
}
record_batch = add_partition_columns(record_batch, partition_cols, partition_values);
if (record_batch == NULL) {
printf("Failed to add parition columns, not adding batch\n");
return;
}
context->batches = g_list_append(context->batches, record_batch);
context->num_batches++;
print_diag(
Expand Down Expand Up @@ -187,28 +107,56 @@ static GArrowBooleanArray* slice_to_arrow_bool_array(const KernelBoolSlice slice
return (GArrowBooleanArray*)ret;
}

static ExclusiveEngineData* apply_transform(
struct EngineContext* context,
ExclusiveEngineData* data) {
print_diag(" Applying transform\n");
SharedExpressionEvaluator* evaluator = get_evaluator(
context->engine,
context->read_schema, // input schema
context->arrow_context->cur_transform,
context->logical_schema); // output schema
ExternResultHandleExclusiveEngineData transformed_res = evaluate(
context->engine,
&data,
evaluator);
if (transformed_res.tag != OkHandleExclusiveEngineData) {
print_error("Failed to transform read data.", (Error*)transformed_res.err);
free_error((Error*)transformed_res.err);
return NULL;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to free data and evaluator regardless of whether this transform succeeded?

}
free_engine_data(data);
free_evaluator(evaluator);
return transformed_res.ok;
}

// This is the callback that will be called for each chunk of data read from the parquet file
static void visit_read_data(void* vcontext, ExclusiveEngineData* data)
{
print_diag(" Converting read data to arrow\n");
struct EngineContext* context = vcontext;
ExternResultArrowFFIData arrow_res = get_raw_arrow_data(data, context->engine);
ExclusiveEngineData* transformed = apply_transform(context, data);
if (!transformed) {
// TODO: What?
exit(-1);
}
ExternResultArrowFFIData arrow_res = get_raw_arrow_data(transformed, context->engine);
if (arrow_res.tag != OkArrowFFIData) {
print_error("Failed to get arrow data.", (Error*)arrow_res.err);
free_error((Error*)arrow_res.err);
exit(-1);
}
ArrowFFIData* arrow_data = arrow_res.ok;
add_batch_to_context(
context->arrow_context, arrow_data, context->partition_cols, context->partition_values);
add_batch_to_context(context->arrow_context, arrow_data);
free(arrow_data); // just frees the struct, the data and schema are freed/owned by add_batch_to_context
}

// We call this for each file we get called back to read in read_table.c::visit_callback
void c_read_parquet_file(
struct EngineContext* context,
const KernelStringSlice path,
const KernelBoolSlice selection_vector)
const KernelBoolSlice selection_vector,
const Expression* transform)
{
int full_len = strlen(context->table_root) + path.len + 1;
char* full_path = malloc(sizeof(char) * full_len);
Expand All @@ -233,6 +181,7 @@ void c_read_parquet_file(
}
context->arrow_context->cur_filter = sel_array;
}
context->arrow_context->cur_transform = transform;
ExclusiveFileReadResultIterator* read_iter = read_res.ok;
for (;;) {
ExternResultbool ok_res = read_result_next(read_iter, context, visit_read_data);
Expand Down
4 changes: 3 additions & 1 deletion ffi/examples/read-table/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ typedef struct ArrowContext
gsize num_batches;
GList* batches;
GArrowBooleanArray* cur_filter;
const Expression* cur_transform;
} ArrowContext;

ArrowContext* init_arrow_context(void);
void c_read_parquet_file(
struct EngineContext* context,
const KernelStringSlice path,
const KernelBoolSlice selection_vector);
const KernelBoolSlice selection_vector,
const Expression* transform);
void print_arrow_context(ArrowContext* context);
void free_arrow_context(ArrowContext* context);

Expand Down
15 changes: 10 additions & 5 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void print_partition_info(struct EngineContext* context, const CStringMap* parti
for (uintptr_t i = 0; i < context->partition_cols->len; i++) {
char* col = context->partition_cols->cols[i];
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
if (partition_val) {
print_diag(" partition '%s' here: %s\n", col, partition_val);
free(partition_val);
Expand All @@ -50,6 +50,7 @@ void scan_row_callback(
int64_t size,
const Stats* stats,
const DvInfo* dv_info,
const Expression* transform,
const CStringMap* partition_values)
{
(void)size; // not using this at the moment
Expand All @@ -76,7 +77,7 @@ void scan_row_callback(
context->partition_values = partition_values;
print_partition_info(context, partition_values);
#ifdef PRINT_ARROW_DATA
c_read_parquet_file(context, path, selection_vector);
c_read_parquet_file(context, path, selection_vector, transform);
#endif
free_bool_slice(selection_vector);
context->partition_values = NULL;
Expand All @@ -87,14 +88,15 @@ void scan_row_callback(
void do_visit_scan_data(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec)
KernelBoolSlice selection_vec,
const CTransformMap* transforms)
{
print_diag("\nScan iterator found some data to read\n Of this data, here is "
"a selection vector\n");
print_selection_vector(" ", &selection_vec);
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, engine_context, scan_row_callback);
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
}
Expand Down Expand Up @@ -272,10 +274,12 @@ int main(int argc, char* argv[])

SharedScan* scan = scan_res.ok;
SharedGlobalScanState* global_state = get_global_scan_state(scan);
SharedSchema* logical_schema = get_global_logical_schema(global_state);
SharedSchema* read_schema = get_global_read_schema(global_state);
PartitionList* partition_cols = get_partition_list(global_state);
struct EngineContext context = {
global_state,
logical_schema,
read_schema,
table_root,
engine,
Expand Down Expand Up @@ -320,7 +324,8 @@ int main(int argc, char* argv[])

free_kernel_scan_data(data_iter);
free_scan(scan);
free_global_read_schema(read_schema);
free_schema(logical_schema);
free_schema(read_schema);
free_global_scan_state(global_state);
free_snapshot(snapshot);
free_engine(engine);
Expand Down
1 change: 1 addition & 0 deletions ffi/examples/read-table/read_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ typedef struct PartitionList
struct EngineContext
{
SharedGlobalScanState* global_state;
SharedSchema* logical_schema;
SharedSchema* read_schema;
char* table_root;
SharedExternEngine* engine;
Expand Down
105 changes: 103 additions & 2 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

use std::sync::Arc;

use delta_kernel::{schema::Schema, DeltaResult, FileDataReadResultIterator};
use delta_kernel::{
schema::{DataType, Schema, SchemaRef},
DeltaResult, EngineData, Expression, ExpressionEvaluator, FileDataReadResultIterator,
};
use delta_kernel_ffi_macros::handle_descriptor;
use tracing::debug;
use url::Url;
Expand Down Expand Up @@ -97,7 +100,7 @@ pub unsafe extern "C" fn free_read_result_iter(data: Handle<ExclusiveFileReadRes
/// Caller is responsible for calling with a valid `ExternEngineHandle` and `FileMeta`
#[no_mangle]
pub unsafe extern "C" fn read_parquet_file(
engine: Handle<SharedExternEngine>,
engine: Handle<SharedExternEngine>, // TODO Does this cause a free?
file: &FileMeta,
physical_schema: Handle<SharedSchema>,
) -> ExternResult<Handle<ExclusiveFileReadResultIterator>> {
Expand Down Expand Up @@ -130,3 +133,101 @@ fn read_parquet_file_impl(
});
Ok(res.into())
}

// Expression Eval

#[handle_descriptor(target=dyn ExpressionEvaluator, mutable=false)]
pub struct SharedExpressionEvaluator;

#[no_mangle]
pub unsafe extern "C" fn get_evaluator(
engine: Handle<SharedExternEngine>,
input_schema: Handle<SharedSchema>,
expression: &Expression,
// TODO: Make this a data_type, and give a way for c code to go between schema <-> datatype
output_type: Handle<SharedSchema>,
) -> Handle<SharedExpressionEvaluator> {
let engine = unsafe { engine.clone_as_arc() };
let input_schema = unsafe { input_schema.clone_as_arc() };
let output_type: DataType = output_type.as_ref().clone().into();
get_evaluator_impl(engine, input_schema, expression, output_type)
}

fn get_evaluator_impl(
extern_engine: Arc<dyn ExternEngine>,
input_schema: SchemaRef,
expression: &Expression,
output_type: DataType,
) -> Handle<SharedExpressionEvaluator> {
let engine = extern_engine.engine();
let evaluator = engine.get_expression_handler().get_evaluator(
input_schema,
expression.clone(),
output_type,
);
evaluator.into()
}

/// Free an evaluator
/// # Safety
///
/// Caller is responsible for passing a valid handle.
#[no_mangle]
pub unsafe extern "C" fn free_evaluator(evaluator: Handle<SharedExpressionEvaluator>) {
debug!("engine released evaluator");
evaluator.drop_handle();
}


#[no_mangle]
pub unsafe extern "C" fn evaluate(
engine: Handle<SharedExternEngine>,
batch: &mut Handle<ExclusiveEngineData>,
evaluator: Handle<SharedExpressionEvaluator>,
) -> ExternResult<Handle<ExclusiveEngineData>> {
let engine = unsafe { engine.clone_as_arc() };
let batch = unsafe { batch.as_mut() };
let evaluator = unsafe { evaluator.clone_as_arc() };
let res = evaluate_impl(batch, evaluator.as_ref());
res.into_extern_result(&engine.as_ref())
}

fn evaluate_impl(
batch: &dyn EngineData,
evaluator: &dyn ExpressionEvaluator,
) -> DeltaResult<Handle<ExclusiveEngineData>> {
let res = evaluator.evaluate(batch);
res.map(|d| d.into())
}

#[cfg(test)]
mod tests {
use super::get_evaluator;
use crate::{free_engine, tests::get_default_engine};
use delta_kernel::{
schema::{DataType, StructField, StructType},
Expression,
};
use std::sync::Arc;

#[test]
fn test_get_evaluator() {
let engine = get_default_engine();
let in_schema = Arc::new(StructType::new(vec![StructField::new(
"a",
DataType::LONG,
true,
)]));
let expr = Expression::literal(1);
let output_type = in_schema.clone();
unsafe {
get_evaluator(
engine.shallow_copy(),
in_schema.into(),
&expr,
output_type.into(),
);
free_engine(engine);
}
}
}
Loading
Loading