Skip to content

Commit b8be7b7

Browse files
authored
chore: Upgrade to datafusion 47.0.0-rc1 and arrow-rs 55.0.0 (#1563)
1 parent 5ed43aa commit b8be7b7

38 files changed

+689
-401
lines changed

native/Cargo.lock

Lines changed: 427 additions & 279 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ edition = "2021"
3434
rust-version = "1.85"
3535

3636
[workspace.dependencies]
37-
arrow = { version = "54.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow = { version = "55.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
3838
async-trait = { version = "0.1" }
3939
bytes = { version = "1.10.0" }
40-
parquet = { version = "54.2.0", default-features = false, features = ["experimental"] }
41-
datafusion = { version = "46.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
40+
parquet = { version = "55.0.0", default-features = false, features = ["experimental"] }
41+
datafusion = { git = "https://github.com/apache/datafusion", rev = "47.0.0-rc1", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
4242
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.8.0" }
4343
datafusion-comet-proto = { path = "proto", version = "0.8.0" }
4444
chrono = { version = "0.4", default-features = false, features = ["clock"] }
@@ -48,7 +48,7 @@ num = "0.4"
4848
rand = "0.8"
4949
regex = "1.9.6"
5050
thiserror = "1"
51-
object_store = { version = "0.11.0", features = ["gcp", "azure", "aws", "http"] }
51+
object_store = { version = "0.12.0", features = ["gcp", "azure", "aws", "http"] }
5252
url = "2.2"
5353

5454
[profile.release]

native/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ jni = { version = "0.21", features = ["invocation"] }
7777
lazy_static = "1.4"
7878
assertables = "7"
7979
hex = "0.4.3"
80-
datafusion-functions-nested = "46.0.0"
80+
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "47.0.0-rc1" }
8181

8282
[features]
8383
default = []

native/core/src/execution/expressions/bloom_filter_might_contain.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use arrow::record_batch::RecordBatch;
2222
use datafusion::common::{internal_err, Result, ScalarValue};
2323
use datafusion::physical_expr::PhysicalExpr;
2424
use datafusion::physical_plan::ColumnarValue;
25+
use std::fmt::Formatter;
2526
use std::hash::Hash;
2627
use std::{any::Any, fmt::Display, sync::Arc};
2728

@@ -140,4 +141,8 @@ impl PhysicalExpr for BloomFilterMightContain {
140141
Arc::clone(&children[1]),
141142
)?))
142143
}
144+
145+
fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
146+
unimplemented!()
147+
}
143148
}

native/core/src/execution/expressions/subquery.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ impl PhysicalExpr for Subquery {
6767
self
6868
}
6969

70+
fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
71+
unimplemented!()
72+
}
73+
7074
fn data_type(&self, _: &Schema) -> datafusion::common::Result<DataType> {
7175
Ok(self.data_type.clone())
7276
}

native/core/src/execution/operators/copy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ impl DisplayAs for CopyExec {
9999
DisplayFormatType::Default | DisplayFormatType::Verbose => {
100100
write!(f, "CopyExec [{:?}]", self.mode)
101101
}
102+
DisplayFormatType::TreeRender => unimplemented!(),
102103
}
103104
}
104105
}

native/core/src/execution/operators/expand.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ impl DisplayAs for ExpandExec {
8585

8686
Ok(())
8787
}
88+
DisplayFormatType::TreeRender => unimplemented!(),
8889
}
8990
}
9091
}

native/core/src/execution/operators/filter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ impl DisplayAs for FilterExec {
312312
self.predicate, display_projections
313313
)
314314
}
315+
DisplayFormatType::TreeRender => unimplemented!(),
315316
}
316317
}
317318
}

native/core/src/execution/operators/scan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ impl DisplayAs for ScanExec {
384384
.collect();
385385
write!(f, "schema=[{}]", fields.join(", "))?;
386386
}
387+
DisplayFormatType::TreeRender => unimplemented!(),
387388
}
388389
Ok(())
389390
}

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ impl DisplayAs for ShuffleWriterExec {
120120
self.partitioning, self.enable_fast_encoding, self.codec
121121
)
122122
}
123+
DisplayFormatType::TreeRender => unimplemented!(),
123124
}
124125
}
125126
}

native/core/src/parquet/parquet_exec.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
2121
use arrow::datatypes::{Field, SchemaRef};
2222
use datafusion::config::TableParquetOptions;
2323
use datafusion::datasource::listing::PartitionedFile;
24-
use datafusion::datasource::physical_plan::{FileScanConfig, FileSource, ParquetSource};
24+
use datafusion::datasource::physical_plan::{
25+
FileGroup, FileScanConfigBuilder, FileSource, ParquetSource,
26+
};
2527
use datafusion::datasource::source::DataSourceExec;
2628
use datafusion::execution::object_store::ObjectStoreUrl;
2729
use datafusion::physical_expr::expressions::BinaryExpr;
@@ -80,23 +82,33 @@ pub(crate) fn init_datasource_exec(
8082
parquet_source = parquet_source.with_predicate(Arc::clone(data_schema), filter);
8183
}
8284
}
85+
86+
let file_groups = file_groups
87+
.iter()
88+
.map(|files| FileGroup::new(files.clone()))
89+
.collect();
90+
8391
let file_scan_config = match (data_schema, projection_vector, partition_fields) {
84-
(Some(data_schema), Some(projection_vector), Some(partition_fields)) => get_file_config(
85-
data_schema,
86-
partition_schema,
87-
file_groups,
88-
object_store_url,
89-
Arc::new(parquet_source),
90-
)
91-
.with_projection(Some(projection_vector))
92-
.with_table_partition_cols(partition_fields),
93-
_ => get_file_config(
92+
(Some(data_schema), Some(projection_vector), Some(partition_fields)) => {
93+
get_file_config_builder(
94+
data_schema,
95+
partition_schema,
96+
file_groups,
97+
object_store_url,
98+
Arc::new(parquet_source),
99+
)
100+
.with_projection(Some(projection_vector))
101+
.with_table_partition_cols(partition_fields)
102+
.build()
103+
}
104+
_ => get_file_config_builder(
94105
required_schema,
95106
partition_schema,
96107
file_groups,
97108
object_store_url,
98109
Arc::new(parquet_source),
99-
),
110+
)
111+
.build(),
100112
};
101113

102114
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
@@ -113,13 +125,13 @@ fn get_options(session_timezone: &str) -> (TableParquetOptions, SparkParquetOpti
113125
(table_parquet_options, spark_parquet_options)
114126
}
115127

116-
fn get_file_config(
128+
fn get_file_config_builder(
117129
schema: SchemaRef,
118130
partition_schema: Option<SchemaRef>,
119-
file_groups: Vec<Vec<PartitionedFile>>,
131+
file_groups: Vec<FileGroup>,
120132
object_store_url: ObjectStoreUrl,
121133
file_source: Arc<dyn FileSource>,
122-
) -> FileScanConfig {
134+
) -> FileScanConfigBuilder {
123135
match partition_schema {
124136
Some(partition_schema) => {
125137
let partition_fields: Vec<Field> = partition_schema
@@ -129,11 +141,11 @@ fn get_file_config(
129141
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
130142
})
131143
.collect_vec();
132-
FileScanConfig::new(object_store_url, Arc::clone(&schema), file_source)
144+
FileScanConfigBuilder::new(object_store_url, Arc::clone(&schema), file_source)
133145
.with_file_groups(file_groups)
134146
.with_table_partition_cols(partition_fields)
135147
}
136-
_ => FileScanConfig::new(object_store_url, Arc::clone(&schema), file_source)
148+
_ => FileScanConfigBuilder::new(object_store_url, Arc::clone(&schema), file_source)
137149
.with_file_groups(file_groups),
138150
}
139151
}

native/core/src/parquet/schema_adapter.rs

Lines changed: 7 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Custom schema adapter that uses Spark-compatible conversions
1919
2020
use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
21-
use arrow::array::{new_null_array, Array, RecordBatch, RecordBatchOptions};
21+
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
2222
use arrow::datatypes::{Schema, SchemaRef};
2323
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
2424
use datafusion::physical_plan::ColumnarValue;
@@ -50,11 +50,10 @@ impl SchemaAdapterFactory for SparkSchemaAdapterFactory {
5050
fn create(
5151
&self,
5252
required_schema: SchemaRef,
53-
table_schema: SchemaRef,
53+
_table_schema: SchemaRef,
5454
) -> Box<dyn SchemaAdapter> {
5555
Box::new(SparkSchemaAdapter {
5656
required_schema,
57-
table_schema,
5857
parquet_options: self.parquet_options.clone(),
5958
})
6059
}
@@ -67,12 +66,6 @@ pub struct SparkSchemaAdapter {
6766
/// The schema for the table, projected to include only the fields being output (projected) by the
6867
/// associated ParquetExec
6968
required_schema: SchemaRef,
70-
/// The entire table schema for the table we're using this to adapt.
71-
///
72-
/// This is used to evaluate any filters pushed down into the scan
73-
/// which may refer to columns that are not referred to anywhere
74-
/// else in the plan.
75-
table_schema: SchemaRef,
7669
/// Spark cast options
7770
parquet_options: SparkParquetOptions,
7871
}
@@ -139,7 +132,6 @@ impl SchemaAdapter for SparkSchemaAdapter {
139132
Arc::new(SchemaMapping {
140133
required_schema: Arc::<Schema>::clone(&self.required_schema),
141134
field_mappings,
142-
table_schema: Arc::<Schema>::clone(&self.table_schema),
143135
parquet_options: self.parquet_options.clone(),
144136
}),
145137
projection,
@@ -186,11 +178,6 @@ pub struct SchemaMapping {
186178
/// They are Options instead of just plain `usize`s because the table could
187179
/// have fields that don't exist in the file.
188180
field_mappings: Vec<Option<usize>>,
189-
/// The entire table schema, as opposed to the projected_table_schema (which
190-
/// only contains the columns that we are projecting out of this query).
191-
/// This contains all fields in the table, regardless of if they will be
192-
/// projected out or not.
193-
table_schema: SchemaRef,
194181
/// Spark cast options
195182
parquet_options: SparkParquetOptions,
196183
}
@@ -239,59 +226,6 @@ impl SchemaMapper for SchemaMapping {
239226
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
240227
Ok(record_batch)
241228
}
242-
243-
/// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only
244-
/// contains the fields that exist in both the file schema and table schema.
245-
///
246-
/// Unlike `map_batch` this method also preserves the columns that
247-
/// may not appear in the final output (`projected_table_schema`) but may
248-
/// appear in push down predicates
249-
fn map_partial_batch(&self, batch: RecordBatch) -> datafusion::common::Result<RecordBatch> {
250-
let batch_cols = batch.columns().to_vec();
251-
let schema = batch.schema();
252-
253-
// for each field in the batch's schema (which is based on a file, not a table)...
254-
let (cols, fields) = schema
255-
.fields()
256-
.iter()
257-
.zip(batch_cols.iter())
258-
.flat_map(|(field, batch_col)| {
259-
self.table_schema
260-
.fields()
261-
.iter()
262-
.enumerate()
263-
.find(|(_, b)| {
264-
if self.parquet_options.case_sensitive {
265-
b.name() == field.name()
266-
} else {
267-
b.name().to_lowercase() == field.name().to_lowercase()
268-
}
269-
})
270-
// but if we do have it,
271-
.map(|(_, table_field)| {
272-
// try to cast it into the correct output type. we don't want to ignore this
273-
// error, though, so it's propagated.
274-
spark_parquet_convert(
275-
ColumnarValue::Array(Arc::clone(batch_col)),
276-
table_field.data_type(),
277-
&self.parquet_options,
278-
)?
279-
.into_array(batch_col.len())
280-
// and if that works, return the field and column.
281-
.map(|new_col| (new_col, table_field.as_ref().clone()))
282-
})
283-
})
284-
.collect::<Result<Vec<_>, _>>()?
285-
.into_iter()
286-
.unzip::<_, _, Vec<_>, Vec<_>>();
287-
288-
// Necessary to handle empty batches
289-
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
290-
291-
let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
292-
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
293-
Ok(record_batch)
294-
}
295229
}
296230

297231
#[cfg(test)]
@@ -306,7 +240,7 @@ mod test {
306240
use datafusion::common::config::TableParquetOptions;
307241
use datafusion::common::DataFusionError;
308242
use datafusion::datasource::listing::PartitionedFile;
309-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
243+
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource};
310244
use datafusion::datasource::source::DataSourceExec;
311245
use datafusion::execution::object_store::ObjectStoreUrl;
312246
use datafusion::execution::TaskContext;
@@ -378,11 +312,11 @@ mod test {
378312
)),
379313
);
380314

315+
let files = FileGroup::new(vec![PartitionedFile::from_path(filename.to_string())?]);
381316
let file_scan_config =
382-
FileScanConfig::new(object_store_url, required_schema, parquet_source)
383-
.with_file_groups(vec![vec![PartitionedFile::from_path(
384-
filename.to_string(),
385-
)?]]);
317+
FileScanConfigBuilder::new(object_store_url, required_schema, parquet_source)
318+
.with_file_groups(vec![files])
319+
.build();
386320

387321
let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));
388322

0 commit comments

Comments
 (0)