Skip to content

Commit c09bf2a

Browse files
committed
Prepare for arrow 15 release
1 parent 2a15e3f commit c09bf2a

File tree

9 files changed

+39
-20
lines changed

9 files changed

+39
-20
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,7 @@ exclude = ["datafusion-cli"]
3535
codegen-units = 1
3636
lto = true
3737

38+
[patch.crates-io]
39+
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"}
40+
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"}
41+
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"}

datafusion/common/src/scalar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ impl ScalarValue {
10591059
let offsets_array = offsets.finish();
10601060
let array_data = ArrayDataBuilder::new(data_type.clone())
10611061
.len(offsets_array.len() - 1)
1062-
.null_bit_buffer(valid.finish())
1062+
.null_bit_buffer(Some(valid.finish()))
10631063
.add_buffer(offsets_array.data().buffers()[0].clone())
10641064
.add_child_data(flat_array.data().clone());
10651065

datafusion/core/src/avro_to_arrow/arrow_array_reader.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
484484
ArrayData::builder(list_field.data_type().clone())
485485
.len(valid_len)
486486
.add_buffer(bool_values.into())
487-
.null_bit_buffer(bool_nulls.into())
487+
.null_bit_buffer(Some(bool_nulls.into()))
488488
.build()
489489
.unwrap()
490490
}
@@ -567,10 +567,9 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
567567
let arrays =
568568
self.build_struct_array(rows.as_slice(), fields.as_slice(), &[])?;
569569
let data_type = DataType::Struct(fields.clone());
570-
let buf = null_buffer.into();
571570
ArrayDataBuilder::new(data_type)
572571
.len(rows.len())
573-
.null_bit_buffer(buf)
572+
.null_bit_buffer(Some(null_buffer.into()))
574573
.child_data(arrays.into_iter().map(|a| a.data().clone()).collect())
575574
.build()
576575
.unwrap()
@@ -587,7 +586,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
587586
.len(list_len)
588587
.add_buffer(Buffer::from_slice_ref(&offsets))
589588
.add_child_data(array_data)
590-
.null_bit_buffer(list_nulls.into())
589+
.null_bit_buffer(Some(list_nulls.into()))
591590
.build()
592591
.unwrap();
593592
Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
@@ -778,7 +777,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
778777
let data_type = DataType::Struct(fields.clone());
779778
let data = ArrayDataBuilder::new(data_type)
780779
.len(len)
781-
.null_bit_buffer(null_buffer.into())
780+
.null_bit_buffer(Some(null_buffer.into()))
782781
.child_data(
783782
arrays.into_iter().map(|a| a.data().clone()).collect(),
784783
)

datafusion/core/src/avro_to_arrow/schema.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ fn schema_to_field_with_props(
103103
.iter()
104104
.map(|s| schema_to_field_with_props(s, None, has_nullable, None))
105105
.collect::<Result<Vec<Field>>>()?;
106-
DataType::Union(fields, UnionMode::Dense)
106+
let type_ids = (0_i8..fields.len() as i8).collect();
107+
DataType::Union(fields, type_ids, UnionMode::Dense)
107108
}
108109
}
109110
AvroSchema::Record { name, fields, .. } => {
@@ -212,7 +213,7 @@ fn default_field_name(dt: &DataType) -> &str {
212213
DataType::FixedSizeList(_, _) => "fixed_size_list",
213214
DataType::LargeList(_) => "largelist",
214215
DataType::Struct(_) => "struct",
215-
DataType::Union(_, _) => "union",
216+
DataType::Union(_, _, _) => "union",
216217
DataType::Dictionary(_, _) => "map",
217218
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
218219
DataType::Decimal(_, _) => "decimal",

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ use futures::{Stream, StreamExt, TryStreamExt};
3737
use log::debug;
3838
use parquet::arrow::{
3939
arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
40-
ParquetFileArrowReader,
40+
ParquetFileArrowReader, ProjectionMask,
4141
};
42+
use parquet::file::reader::FileReader;
4243
use parquet::file::{
4344
metadata::RowGroupMetaData, properties::WriterProperties,
4445
reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
@@ -352,14 +353,18 @@ impl ParquetExecStream {
352353
opt.build(),
353354
)?;
354355

356+
let file_metadata = file_reader.metadata().file_metadata();
357+
let parquet_schema = file_metadata.schema_descr_ptr();
358+
355359
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
360+
let arrow_schema = arrow_reader.get_schema()?;
356361

357362
let adapted_projections = self
358363
.adapter
359-
.map_projections(&arrow_reader.get_schema()?, &self.projection)?;
364+
.map_projections(&arrow_schema, &self.projection)?;
360365

361-
let reader = arrow_reader
362-
.get_record_reader_by_columns(adapted_projections, self.batch_size)?;
366+
let mask = ProjectionMask::roots(&parquet_schema, adapted_projections);
367+
let reader = arrow_reader.get_record_reader_by_columns(mask, self.batch_size)?;
363368

364369
Ok(reader)
365370
}

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ enum UnionMode{
398398
message Union{
399399
repeated Field union_types = 1;
400400
UnionMode union_mode = 2;
401+
repeated int32 type_ids = 3;
401402
}
402403

403404
message ScalarListValue{

datafusion/proto/src/from_proto.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,16 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
338338
let union_types = union
339339
.union_types
340340
.iter()
341-
.map(|field| field.try_into())
341+
.map(TryInto::try_into)
342342
.collect::<Result<Vec<_>, _>>()?;
343-
DataType::Union(union_types, union_mode)
343+
344+
// Default to index based type ids if not provided
345+
let type_ids = match union.type_ids.is_empty() {
346+
true => (0..union_types.len() as i8).collect(),
347+
false => union.type_ids.iter().map(|i| *i as i8).collect(),
348+
};
349+
350+
DataType::Union(union_types, type_ids, union_mode)
344351
}
345352
arrow_type::ArrowTypeEnum::Dictionary(dict) => {
346353
let key_datatype = dict.as_ref().key.as_deref().required("key")?;

datafusion/proto/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ mod roundtrip_tests {
362362
Field::new("name", DataType::Utf8, false),
363363
Field::new("datatype", DataType::Binary, false),
364364
],
365+
vec![0, 2, 3],
365366
UnionMode::Dense,
366367
),
367368
DataType::Union(
@@ -379,6 +380,7 @@ mod roundtrip_tests {
379380
true,
380381
),
381382
],
383+
vec![1, 2, 3],
382384
UnionMode::Sparse,
383385
),
384386
DataType::Dictionary(
@@ -514,6 +516,7 @@ mod roundtrip_tests {
514516
Field::new("name", DataType::Utf8, false),
515517
Field::new("datatype", DataType::Binary, false),
516518
],
519+
vec![7, 5, 3],
517520
UnionMode::Sparse,
518521
),
519522
DataType::Union(
@@ -531,6 +534,7 @@ mod roundtrip_tests {
531534
true,
532535
),
533536
],
537+
vec![5, 8, 1],
534538
UnionMode::Dense,
535539
),
536540
DataType::Dictionary(

datafusion/proto/src/to_proto.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,17 +200,15 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
200200
.map(|field| field.into())
201201
.collect::<Vec<_>>(),
202202
}),
203-
DataType::Union(union_types, union_mode) => {
203+
DataType::Union(union_types, type_ids, union_mode) => {
204204
let union_mode = match union_mode {
205205
UnionMode::Sparse => protobuf::UnionMode::Sparse,
206206
UnionMode::Dense => protobuf::UnionMode::Dense,
207207
};
208208
Self::Union(protobuf::Union {
209-
union_types: union_types
210-
.iter()
211-
.map(|field| field.into())
212-
.collect::<Vec<_>>(),
209+
union_types: union_types.iter().map(Into::into).collect(),
213210
union_mode: union_mode.into(),
211+
type_ids: type_ids.iter().map(|x| *x as i32).collect(),
214212
})
215213
}
216214
DataType::Dictionary(key_type, value_type) => {
@@ -1188,7 +1186,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
11881186
| DataType::FixedSizeList(_, _)
11891187
| DataType::LargeList(_)
11901188
| DataType::Struct(_)
1191-
| DataType::Union(_, _)
1189+
| DataType::Union(_, _, _)
11921190
| DataType::Dictionary(_, _)
11931191
| DataType::Map(_, _)
11941192
| DataType::Decimal(_, _) => {

0 commit comments

Comments
 (0)