Skip to content

Commit 27b4e81

Browse files
authored
[comet-parquet-exec] Move type conversion logic for ParquetExec out of Cast expression. (#1229)
* Copy cast.rs logic to a new parquet_support.rs. Remove Parquet dependencies on cast.rs. * Move parquet_support and schema_adapter to parquet folder. * Add fields to SparkParquetOptions.
1 parent 01b5917 commit 27b4e81

File tree

7 files changed

+2404
-58
lines changed

7 files changed

+2404
-58
lines changed

native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ datafusion-comet-spark-expr = { workspace = true }
7979
datafusion-comet-proto = { workspace = true }
8080
object_store = { workspace = true }
8181
url = { workspace = true }
82+
chrono = { workspace = true }
8283

8384
[dev-dependencies]
8485
pprof = { version = "0.13.0", features = ["flamegraph"] }

native/core/src/execution/planner.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ use datafusion::{
6565
},
6666
prelude::SessionContext,
6767
};
68-
use datafusion_comet_spark_expr::{
69-
create_comet_physical_fun, create_negate_expr, SparkSchemaAdapterFactory,
70-
};
68+
use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr};
7169
use datafusion_functions_nested::concat::ArrayAppend;
7270
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
7371

7472
use crate::execution::spark_plan::SparkPlan;
73+
use crate::parquet::parquet_support::SparkParquetOptions;
74+
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
7575
use datafusion::datasource::listing::PartitionedFile;
7676
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
7777
use datafusion::datasource::physical_plan::FileScanConfig;
@@ -1156,13 +1156,14 @@ impl PhysicalPlanner {
11561156
table_parquet_options.global.pushdown_filters = true;
11571157
table_parquet_options.global.reorder_filters = true;
11581158

1159-
let mut spark_cast_options = SparkCastOptions::new(EvalMode::Legacy, "UTC", false);
1160-
spark_cast_options.allow_cast_unsigned_ints = true;
1159+
let mut spark_parquet_options =
1160+
SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
1161+
spark_parquet_options.allow_cast_unsigned_ints = true;
11611162

11621163
let mut builder = ParquetExecBuilder::new(file_scan_config)
11631164
.with_table_parquet_options(table_parquet_options)
11641165
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
1165-
spark_cast_options,
1166+
spark_parquet_options,
11661167
)));
11671168

11681169
if let Some(filter) = cnf_data_filters {

native/core/src/parquet/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ pub use mutable_vector::*;
2121

2222
#[macro_use]
2323
pub mod util;
24+
pub mod parquet_support;
2425
pub mod read;
26+
pub mod schema_adapter;
2527

2628
use std::task::Poll;
2729
use std::{boxed::Box, ptr::NonNull, sync::Arc};
@@ -40,27 +42,28 @@ use jni::{
4042
},
4143
};
4244

45+
use self::util::jni::TypePromotionInfo;
4346
use crate::execution::operators::ExecutionError;
4447
use crate::execution::utils::SparkArrowConvert;
4548
use crate::parquet::data_type::AsBytes;
49+
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
4650
use arrow::buffer::{Buffer, MutableBuffer};
4751
use arrow_array::{Array, RecordBatch};
4852
use datafusion::datasource::listing::PartitionedFile;
4953
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
5054
use datafusion::datasource::physical_plan::FileScanConfig;
5155
use datafusion::physical_plan::ExecutionPlan;
52-
use datafusion_comet_spark_expr::{EvalMode, SparkCastOptions, SparkSchemaAdapterFactory};
56+
use datafusion_comet_spark_expr::EvalMode;
5357
use datafusion_common::config::TableParquetOptions;
5458
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5559
use futures::{poll, StreamExt};
5660
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
5761
use jni::sys::jstring;
5862
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
63+
use parquet_support::SparkParquetOptions;
5964
use read::ColumnReader;
6065
use util::jni::{convert_column_descriptor, convert_encoding, deserialize_schema, get_file_path};
6166

62-
use self::util::jni::TypePromotionInfo;
63-
6467
/// Parquet read context maintained across multiple JNI calls.
6568
struct Context {
6669
pub column_reader: ColumnReader,
@@ -679,13 +682,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
679682
table_parquet_options.global.pushdown_filters = true;
680683
table_parquet_options.global.reorder_filters = true;
681684

682-
let mut spark_cast_options = SparkCastOptions::new(EvalMode::Legacy, "UTC", false);
683-
spark_cast_options.allow_cast_unsigned_ints = true;
685+
let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
686+
spark_parquet_options.allow_cast_unsigned_ints = true;
684687

685688
let builder2 = ParquetExecBuilder::new(file_scan_config)
686689
.with_table_parquet_options(table_parquet_options)
687690
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
688-
spark_cast_options,
691+
spark_parquet_options,
689692
)));
690693

691694
//TODO: (ARROW NATIVE) - predicate pushdown??

0 commit comments

Comments
 (0)