Skip to content

Commit 6df59bd

Browse files
authored
chore: Upgrade to DataFusion 44.0.0-rc2 (#1154) (#1272)
1 parent b49c17b commit 6df59bd

35 files changed

+718
-1016
lines changed

native/Cargo.lock

Lines changed: 295 additions & 287 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,21 @@ edition = "2021"
3333
rust-version = "1.79"
3434

3535
[workspace.dependencies]
36-
arrow = { version = "53.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37-
arrow-array = { version = "53.2.0" }
38-
arrow-buffer = { version = "53.2.0" }
39-
arrow-data = { version = "53.2.0" }
40-
arrow-schema = { version = "53.2.0" }
41-
parquet = { version = "53.2.0", default-features = false, features = ["experimental"] }
42-
datafusion = { version = "43.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "parquet"] }
43-
datafusion-common = { version = "43.0.0" }
44-
datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] }
45-
datafusion-functions-nested = { version = "43.0.0", default-features = false }
46-
datafusion-expr = { version = "43.0.0", default-features = false }
47-
datafusion-execution = { version = "43.0.0", default-features = false }
48-
datafusion-physical-plan = { version = "43.0.0", default-features = false }
49-
datafusion-physical-expr = { version = "43.0.0", default-features = false }
36+
arrow = { version = "53.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow-array = { version = "53.3.0" }
38+
arrow-buffer = { version = "53.3.0" }
39+
arrow-data = { version = "53.3.0" }
40+
arrow-schema = { version = "53.3.0" }
41+
parquet = { version = "53.3.0", default-features = false, features = ["experimental"] }
42+
datafusion = { version = "44.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
43+
datafusion-common = { version = "44.0.0", default-features = false }
44+
datafusion-functions = { version = "44.0.0", default-features = false, features = ["crypto_expressions"] }
45+
datafusion-functions-nested = { version = "44.0.0", default-features = false }
46+
datafusion-expr = { version = "44.0.0", default-features = false }
47+
datafusion-expr-common = { version = "44.0.0", default-features = false }
48+
datafusion-execution = { version = "44.0.0", default-features = false }
49+
datafusion-physical-plan = { version = "44.0.0", default-features = false }
50+
datafusion-physical-expr = { version = "44.0.0", default-features = false }
5051
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" }
5152
datafusion-comet-proto = { path = "proto", version = "0.5.0" }
5253
chrono = { version = "0.4", default-features = false, features = ["clock"] }

native/core/src/execution/expressions/bloom_filter_might_contain.rs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,37 @@ use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data
1919
use arrow::record_batch::RecordBatch;
2020
use arrow_array::cast::as_primitive_array;
2121
use arrow_schema::{DataType, Schema};
22-
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
2322
use datafusion::physical_plan::ColumnarValue;
2423
use datafusion_common::{internal_err, Result, ScalarValue};
2524
use datafusion_physical_expr::PhysicalExpr;
26-
use std::{
27-
any::Any,
28-
fmt::Display,
29-
hash::{Hash, Hasher},
30-
sync::Arc,
31-
};
25+
use std::hash::Hash;
26+
use std::{any::Any, fmt::Display, sync::Arc};
3227

3328
/// A physical expression that checks if a value might be in a bloom filter. It corresponds to the
3429
/// Spark's `BloomFilterMightContain` expression.
35-
#[derive(Debug, Hash)]
30+
#[derive(Debug, Eq)]
3631
pub struct BloomFilterMightContain {
3732
pub bloom_filter_expr: Arc<dyn PhysicalExpr>,
3833
pub value_expr: Arc<dyn PhysicalExpr>,
3934
bloom_filter: Option<SparkBloomFilter>,
4035
}
4136

37+
impl Hash for BloomFilterMightContain {
38+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
39+
self.bloom_filter_expr.hash(state);
40+
self.value_expr.hash(state);
41+
self.bloom_filter.hash(state);
42+
}
43+
}
44+
45+
impl PartialEq for BloomFilterMightContain {
46+
fn eq(&self, other: &Self) -> bool {
47+
self.bloom_filter_expr.eq(&other.bloom_filter_expr)
48+
&& self.value_expr.eq(&other.value_expr)
49+
&& self.bloom_filter.eq(&other.bloom_filter)
50+
}
51+
}
52+
4253
impl Display for BloomFilterMightContain {
4354
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4455
write!(
@@ -49,18 +60,6 @@ impl Display for BloomFilterMightContain {
4960
}
5061
}
5162

52-
impl PartialEq<dyn Any> for BloomFilterMightContain {
53-
fn eq(&self, _other: &dyn Any) -> bool {
54-
down_cast_any_ref(_other)
55-
.downcast_ref::<Self>()
56-
.map(|other| {
57-
self.bloom_filter_expr.eq(&other.bloom_filter_expr)
58-
&& self.value_expr.eq(&other.value_expr)
59-
})
60-
.unwrap_or(false)
61-
}
62-
}
63-
6463
fn evaluate_bloom_filter(
6564
bloom_filter_expr: &Arc<dyn PhysicalExpr>,
6665
) -> Result<Option<SparkBloomFilter>> {
@@ -141,11 +140,4 @@ impl PhysicalExpr for BloomFilterMightContain {
141140
Arc::clone(&children[1]),
142141
)?))
143142
}
144-
145-
fn dyn_hash(&self, state: &mut dyn Hasher) {
146-
let mut s = state;
147-
self.bloom_filter_expr.hash(&mut s);
148-
self.value_expr.hash(&mut s);
149-
self.hash(&mut s);
150-
}
151143
}

native/core/src/execution/expressions/subquery.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use crate::{
2222
use arrow_array::RecordBatch;
2323
use arrow_schema::{DataType, Schema, TimeUnit};
2424
use datafusion::logical_expr::ColumnarValue;
25-
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
2625
use datafusion_common::{internal_err, ScalarValue};
2726
use datafusion_physical_expr::PhysicalExpr;
2827
use jni::{
@@ -32,11 +31,11 @@ use jni::{
3231
use std::{
3332
any::Any,
3433
fmt::{Display, Formatter},
35-
hash::{Hash, Hasher},
34+
hash::Hash,
3635
sync::Arc,
3736
};
3837

39-
#[derive(Debug, Hash)]
38+
#[derive(Debug, Hash, PartialEq, Eq)]
4039
pub struct Subquery {
4140
/// The ID of the execution context that owns this subquery. We use this ID to retrieve the
4241
/// subquery result.
@@ -63,19 +62,6 @@ impl Display for Subquery {
6362
}
6463
}
6564

66-
impl PartialEq<dyn Any> for Subquery {
67-
fn eq(&self, other: &dyn Any) -> bool {
68-
down_cast_any_ref(other)
69-
.downcast_ref::<Self>()
70-
.map(|x| {
71-
self.id.eq(&x.id)
72-
&& self.data_type.eq(&x.data_type)
73-
&& self.exec_context_id.eq(&x.exec_context_id)
74-
})
75-
.unwrap_or(false)
76-
}
77-
}
78-
7965
impl PhysicalExpr for Subquery {
8066
fn as_any(&self) -> &dyn Any {
8167
self
@@ -209,9 +195,4 @@ impl PhysicalExpr for Subquery {
209195
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
210196
Ok(self)
211197
}
212-
213-
fn dyn_hash(&self, state: &mut dyn Hasher) {
214-
let mut s = state;
215-
self.hash(&mut s)
216-
}
217198
}

native/core/src/execution/jni_api.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
2121
use arrow::datatypes::DataType as ArrowDataType;
2222
use arrow_array::RecordBatch;
2323
use datafusion::{
24-
execution::{
25-
disk_manager::DiskManagerConfig,
26-
runtime_env::{RuntimeConfig, RuntimeEnv},
27-
},
24+
execution::{disk_manager::DiskManagerConfig, runtime_env::RuntimeEnv},
2825
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
2926
prelude::{SessionConfig, SessionContext},
3027
};
@@ -51,6 +48,7 @@ use crate::{
5148
};
5249
use datafusion_comet_proto::spark_operator::Operator;
5350
use datafusion_common::ScalarValue;
51+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
5452
use futures::stream::StreamExt;
5553
use jni::{
5654
objects::GlobalRef,
@@ -191,7 +189,7 @@ fn prepare_datafusion_session_context(
191189
memory_fraction: f64,
192190
comet_task_memory_manager: Arc<GlobalRef>,
193191
) -> CometResult<SessionContext> {
194-
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
192+
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);
195193

196194
// Check if we are using unified memory manager integrated with Spark.
197195
if use_unified_memory_manager {
@@ -219,6 +217,7 @@ fn prepare_datafusion_session_context(
219217
&ScalarValue::Float64(Some(1.1)),
220218
);
221219

220+
#[allow(deprecated)]
222221
let runtime = RuntimeEnv::try_new(rt_config)?;
223222

224223
let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));

native/core/src/execution/operators/copy.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use arrow_array::{
3030
use arrow_data::transform::MutableArrayData;
3131
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaRef};
3232

33+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3334
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
3435
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
3536
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
@@ -78,7 +79,8 @@ impl CopyExec {
7879
let cache = PlanProperties::new(
7980
EquivalenceProperties::new(Arc::clone(&schema)),
8081
Partitioning::UnknownPartitioning(1),
81-
ExecutionMode::Bounded,
82+
EmissionType::Final,
83+
Boundedness::Bounded,
8284
);
8385

8486
Self {

native/core/src/execution/operators/expand.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
use arrow_array::{RecordBatch, RecordBatchOptions};
1919
use arrow_schema::SchemaRef;
20+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
2021
use datafusion::{
2122
execution::TaskContext,
2223
physical_plan::{
23-
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
24+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
2425
RecordBatchStream, SendableRecordBatchStream,
2526
},
2627
};
@@ -54,7 +55,8 @@ impl ExpandExec {
5455
let cache = PlanProperties::new(
5556
EquivalenceProperties::new(Arc::clone(&schema)),
5657
Partitioning::UnknownPartitioning(1),
57-
ExecutionMode::Bounded,
58+
EmissionType::Final,
59+
Boundedness::Bounded,
5860
);
5961

6062
Self {

native/core/src/execution/operators/filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ impl FilterExec {
210210
Ok(PlanProperties::new(
211211
eq_properties,
212212
input.output_partitioning().clone(), // Output Partitioning
213-
input.execution_mode(), // Execution Mode
213+
input.pipeline_behavior(),
214+
input.boundedness(),
214215
))
215216
}
216217
}

native/core/src/execution/operators/scan.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use arrow_data::ffi::FFI_ArrowArray;
2828
use arrow_data::ArrayData;
2929
use arrow_schema::ffi::FFI_ArrowSchema;
3030
use arrow_schema::{DataType, Field, Schema, SchemaRef};
31+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3132
use datafusion::physical_plan::metrics::{
3233
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
3334
};
@@ -122,7 +123,8 @@ impl ScanExec {
122123
// The partitioning is not important because we are not using DataFusion's
123124
// query planner or optimizer
124125
Partitioning::UnknownPartitioning(1),
125-
ExecutionMode::Bounded,
126+
EmissionType::Final,
127+
Boundedness::Bounded,
126128
);
127129

128130
Ok(Self {

0 commit comments

Comments
 (0)