Skip to content

Commit 7d9e3b2

Browse files
authored
chore: Prepare for DataFusion 48.0.0 (#1710)
1 parent 7d46ff2 commit 7d9e3b2

File tree

11 files changed

+216
-224
lines changed

11 files changed

+216
-224
lines changed

native/Cargo.lock

Lines changed: 178 additions & 200 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ edition = "2021"
3434
rust-version = "1.85"
3535

3636
[workspace.dependencies]
37-
arrow = { version = "55.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow = { version = "55.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
3838
async-trait = { version = "0.1" }
3939
bytes = { version = "1.10.0" }
40-
parquet = { version = "55.0.0", default-features = false, features = ["experimental"] }
41-
datafusion = { version = "47.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
40+
parquet = { version = "55.1.0", default-features = false, features = ["experimental"] }
41+
datafusion = { git = "https://github.com/apache/datafusion", rev = "efd9587", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
4242
datafusion-comet-spark-expr = { path = "spark-expr" }
4343
datafusion-comet-proto = { path = "proto" }
4444
chrono = { version = "0.4", default-features = false, features = ["clock"] }

native/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ jni = { version = "0.21", features = ["invocation"] }
8181
lazy_static = "1.4"
8282
assertables = "9"
8383
hex = "0.4.3"
84-
datafusion-functions-nested = { version = "47.0.0" }
84+
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "efd9587" }
8585

8686
[features]
8787
default = []

native/core/src/execution/operators/copy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ impl ExecutionPlan for CopyExec {
147147
)))
148148
}
149149

150-
fn statistics(&self) -> DataFusionResult<Statistics> {
151-
self.input.statistics()
150+
fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
151+
self.input.partition_statistics(partition)
152152
}
153153

154154
fn properties(&self) -> &PlanProperties {

native/core/src/execution/operators/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ impl FilterExec {
166166
predicate: &Arc<dyn PhysicalExpr>,
167167
default_selectivity: u8,
168168
) -> Result<Statistics> {
169-
let input_stats = input.statistics()?;
169+
let input_stats = input.partition_statistics(None)?;
170170
let schema = input.schema();
171171
if !check_support(predicate, &schema) {
172172
let selectivity = default_selectivity as f64 / 100.0;

native/core/src/execution/planner.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ use datafusion::common::{
7878
use datafusion::datasource::listing::PartitionedFile;
7979
use datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression;
8080
use datafusion::logical_expr::{
81-
AggregateUDF, ReturnTypeArgs, WindowFrame, WindowFrameBound, WindowFrameUnits,
81+
AggregateUDF, ReturnFieldArgs, WindowFrame, WindowFrameBound, WindowFrameUnits,
8282
WindowFunctionDefinition,
8383
};
8484
use datafusion::physical_expr::expressions::{Literal, StatsType};
@@ -886,7 +886,7 @@ impl PhysicalPlanner {
886886
func_name,
887887
fun_expr,
888888
vec![left, right],
889-
data_type,
889+
Field::new(func_name, data_type, true),
890890
)))
891891
}
892892
_ => Ok(Arc::new(BinaryExpr::new(left, op, right))),
@@ -2191,6 +2191,12 @@ impl PhysicalPlanner {
21912191
.coerce_types(&input_expr_types)
21922192
.unwrap_or_else(|_| input_expr_types.clone());
21932193

2194+
let arg_fields = coerced_types
2195+
.iter()
2196+
.enumerate()
2197+
.map(|(i, dt)| Field::new(format!("arg{i}"), dt.clone(), true))
2198+
.collect::<Vec<_>>();
2199+
21942200
// TODO this should try and find scalar
21952201
let arguments = args
21962202
.iter()
@@ -2202,18 +2208,16 @@ impl PhysicalPlanner {
22022208
})
22032209
.collect::<Vec<_>>();
22042210

2205-
let nullables = arguments.iter().map(|_| true).collect::<Vec<_>>();
2206-
2207-
let args = ReturnTypeArgs {
2208-
arg_types: &coerced_types,
2211+
let args = ReturnFieldArgs {
2212+
arg_fields: &arg_fields,
22092213
scalar_arguments: &arguments,
2210-
nullables: &nullables,
22112214
};
22122215

22132216
let data_type = func
22142217
.inner()
2215-
.return_type_from_args(args)?
2216-
.return_type()
2218+
.return_field_from_args(args)?
2219+
.clone()
2220+
.data_type()
22172221
.clone();
22182222

22192223
(data_type, coerced_types)
@@ -2246,7 +2250,7 @@ impl PhysicalPlanner {
22462250
fun_name,
22472251
fun_expr,
22482252
args.to_vec(),
2249-
data_type,
2253+
Field::new(fun_name, data_type, true),
22502254
));
22512255

22522256
Ok(scalar_expr)

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ impl ExecutionPlan for ShuffleWriterExec {
141141
}
142142

143143
fn statistics(&self) -> Result<Statistics> {
144-
self.input.statistics()
144+
self.input.partition_statistics(None)
145145
}
146146

147147
fn properties(&self) -> &PlanProperties {

native/core/src/parquet/parquet_exec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ pub(crate) fn init_datasource_exec(
7878
))
7979
});
8080

81-
if let (Some(filter), Some(data_schema)) = (cnf_data_filters, &data_schema) {
82-
parquet_source = parquet_source.with_predicate(Arc::clone(data_schema), filter);
81+
if let Some(filter) = cnf_data_filters {
82+
parquet_source = parquet_source.with_predicate(filter);
8383
}
8484
}
8585

native/core/src/parquet/schema_adapter.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
2121
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
2222
use arrow::datatypes::{Schema, SchemaRef};
23+
use datafusion::common::ColumnStatistics;
2324
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
2425
use datafusion::physical_plan::ColumnarValue;
2526
use std::sync::Arc;
@@ -226,6 +227,13 @@ impl SchemaMapper for SchemaMapping {
226227
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
227228
Ok(record_batch)
228229
}
230+
231+
fn map_column_statistics(
232+
&self,
233+
_file_col_statistics: &[ColumnStatistics],
234+
) -> datafusion::common::Result<Vec<ColumnStatistics>> {
235+
Ok(vec![])
236+
}
229237
}
230238

231239
#[cfg(test)]

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use arrow::{
3838
record_batch::RecordBatch,
3939
util::display::FormatOptions,
4040
};
41-
use chrono::{NaiveDate, NaiveDateTime, TimeZone, Timelike};
41+
use chrono::{DateTime, NaiveDate, TimeZone, Timelike};
4242
use datafusion::common::{
4343
cast::as_generic_string_array, internal_err, Result as DataFusionResult, ScalarValue,
4444
};
@@ -2108,7 +2108,7 @@ fn date_parser(date_str: &str, eval_mode: EvalMode) -> SparkResult<Option<i32>>
21082108
) {
21092109
Some(date) => {
21102110
let duration_since_epoch = date
2111-
.signed_duration_since(NaiveDateTime::UNIX_EPOCH.date())
2111+
.signed_duration_since(DateTime::UNIX_EPOCH.naive_utc().date())
21122112
.num_days();
21132113
Ok(Some(duration_since_epoch.to_i32().unwrap()))
21142114
}

native/spark-expr/src/hash_funcs/sha2.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::math_funcs::hex::hex_strings;
1919
use arrow::array::{Array, StringArray};
20-
use arrow::datatypes::DataType;
20+
use arrow::datatypes::{DataType, Field};
2121
use datafusion::common::cast::as_binary_array;
2222
use datafusion::common::{exec_err, DataFusionError, ScalarValue};
2323
use datafusion::functions::crypto::{sha224, sha256, sha384, sha512};
@@ -55,10 +55,12 @@ fn wrap_digest_result_as_hex_string(
5555
ColumnarValue::Array(array) => array.len(),
5656
ColumnarValue::Scalar(_) => 1,
5757
};
58+
let return_field = Field::new("foo", DataType::Utf8, false);
5859
let value = digest.invoke_with_args(ScalarFunctionArgs {
5960
args: args.into(),
61+
arg_fields: vec![],
6062
number_rows: row_count,
61-
return_type: &DataType::Utf8,
63+
return_field: &return_field,
6264
})?;
6365
match value {
6466
ColumnarValue::Array(array) => {

0 commit comments

Comments
 (0)