Skip to content

Upgrade to arrow 15 #2631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@ exclude = ["datafusion-cli"]
[profile.release]
codegen-units = 1
lto = true

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.59"
readme = "README.md"

[dependencies]
arrow = { version = "14.0.0" }
arrow = { version = "15.0.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "8.0.0" }
dirs = "4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "14.0.0" }
arrow-flight = { version = "15.0.0" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]

[dependencies]
arrow = { version = "14.0.0", features = ["prettyprint"] }
arrow = { version = "15.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.84.0", optional = true }
ordered-float = "3.0"
parquet = { version = "14.0.0", features = ["arrow"], optional = true }
parquet = { version = "15.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.17"
2 changes: 1 addition & 1 deletion datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ impl ScalarValue {
let offsets_array = offsets.finish();
let array_data = ArrayDataBuilder::new(data_type.clone())
.len(offsets_array.len() - 1)
.null_bit_buffer(valid.finish())
.null_bit_buffer(Some(valid.finish()))
.add_buffer(offsets_array.data().buffers()[0].clone())
.add_child_data(flat_array.data().clone());

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "14.0.0", features = ["prettyprint"] }
arrow = { version = "15.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
chrono = { version = "0.4", default-features = false }
Expand All @@ -76,7 +76,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "14.0.0", features = ["arrow"] }
parquet = { version = "15.0.0", features = ["arrow"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "14.0.0", features = ["prettyprint"] }
arrow = { version = "15.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
9 changes: 4 additions & 5 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
ArrayData::builder(list_field.data_type().clone())
.len(valid_len)
.add_buffer(bool_values.into())
.null_bit_buffer(bool_nulls.into())
.null_bit_buffer(Some(bool_nulls.into()))
.build()
.unwrap()
}
Expand Down Expand Up @@ -567,10 +567,9 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let arrays =
self.build_struct_array(rows.as_slice(), fields.as_slice(), &[])?;
let data_type = DataType::Struct(fields.clone());
let buf = null_buffer.into();
ArrayDataBuilder::new(data_type)
.len(rows.len())
.null_bit_buffer(buf)
.null_bit_buffer(Some(null_buffer.into()))
.child_data(arrays.into_iter().map(|a| a.data().clone()).collect())
.build()
.unwrap()
Expand All @@ -587,7 +586,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.len(list_len)
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_child_data(array_data)
.null_bit_buffer(list_nulls.into())
.null_bit_buffer(Some(list_nulls.into()))
.build()
.unwrap();
Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
Expand Down Expand Up @@ -778,7 +777,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let data_type = DataType::Struct(fields.clone());
let data = ArrayDataBuilder::new(data_type)
.len(len)
.null_bit_buffer(null_buffer.into())
.null_bit_buffer(Some(null_buffer.into()))
.child_data(
arrays.into_iter().map(|a| a.data().clone()).collect(),
)
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ fn schema_to_field_with_props(
.iter()
.map(|s| schema_to_field_with_props(s, None, has_nullable, None))
.collect::<Result<Vec<Field>>>()?;
DataType::Union(fields, UnionMode::Dense)
let type_ids = (0_i8..fields.len() as i8).collect();
DataType::Union(fields, type_ids, UnionMode::Dense)
}
}
AvroSchema::Record { name, fields, .. } => {
Expand Down Expand Up @@ -212,7 +213,7 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::FixedSizeList(_, _) => "fixed_size_list",
DataType::LargeList(_) => "largelist",
DataType::Struct(_) => "struct",
DataType::Union(_, _) => "union",
DataType::Union(_, _, _) => "union",
DataType::Dictionary(_, _) => "map",
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
DataType::Decimal(_, _) => "decimal",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ mod tests {
let expected = Statistics {
is_exact: true,
num_rows: Some(3),
total_byte_size: Some(416), // this might change a bit if the way we compute the size changes
total_byte_size: Some(464), // this might change a bit if the way we compute the size changes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to DataType have made it bigger, I don't think this is a problem

column_statistics: Some(vec![
ColumnStatistics {
distinct_count: None,
Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ use futures::{Stream, StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::{
arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter,
ParquetFileArrowReader,
ParquetFileArrowReader, ProjectionMask,
};
use parquet::file::reader::FileReader;
use parquet::file::{
metadata::RowGroupMetaData, properties::WriterProperties,
reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder,
Expand Down Expand Up @@ -352,14 +353,18 @@ impl ParquetExecStream {
opt.build(),
)?;

let file_metadata = file_reader.metadata().file_metadata();
let parquet_schema = file_metadata.schema_descr_ptr();

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let arrow_schema = arrow_reader.get_schema()?;

let adapted_projections = self
.adapter
.map_projections(&arrow_reader.get_schema()?, &self.projection)?;
.map_projections(&arrow_schema, &self.projection)?;

let reader = arrow_reader
.get_record_reader_by_columns(adapted_projections, self.batch_size)?;
let mask = ProjectionMask::roots(&parquet_schema, adapted_projections);
let reader = arrow_reader.get_record_reader_by_columns(mask, self.batch_size)?;

Ok(reader)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "14.0.0", features = ["prettyprint"] }
arrow = { version = "15.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "8.0.0" }
sqlparser = "0.17"
2 changes: 1 addition & 1 deletion datafusion/jit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ path = "src/lib.rs"
jit = []

[dependencies]
arrow = { version = "14.0.0" }
arrow = { version = "15.0.0" }
cranelift = "0.84.0"
cranelift-jit = "0.84.0"
cranelift-module = "0.84.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "14.0.0", features = ["prettyprint"] }
arrow = { version = "15.0.0", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,10 +594,10 @@ mod tests {
.collect();

//let valid_array = vec![true, false, false, true, false, tru
let null_buffer = Buffer::from_slice_ref(&[0b00101001u8]);
let null_buffer = Buffer::from([0b00101001u8]);
let load4 = ArrayDataBuilder::new(load4.data_type().clone())
.len(load4.len())
.null_bit_buffer(null_buffer)
.null_bit_buffer(Some(null_buffer))
.buffers(load4.data().buffers().to_vec())
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ path = "src/lib.rs"
[features]

[dependencies]
arrow = { version = "14.0.0" }
arrow = { version = "15.0.0" }
datafusion = { path = "../core", version = "8.0.0" }
datafusion-common = { path = "../common", version = "8.0.0" }
datafusion-expr = { path = "../expr", version = "8.0.0" }
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ enum UnionMode{
message Union{
repeated Field union_types = 1;
UnionMode union_mode = 2;
repeated int32 type_ids = 3;
}

message ScalarListValue{
Expand Down
11 changes: 9 additions & 2 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,16 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
let union_types = union
.union_types
.iter()
.map(|field| field.try_into())
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?;
DataType::Union(union_types, union_mode)

// Default to index based type ids if not provided
let type_ids = match union.type_ids.is_empty() {
true => (0..union_types.len() as i8).collect(),
false => union.type_ids.iter().map(|i| *i as i8).collect(),
};

DataType::Union(union_types, type_ids, union_mode)
}
arrow_type::ArrowTypeEnum::Dictionary(dict) => {
let key_datatype = dict.as_ref().key.as_deref().required("key")?;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ mod roundtrip_tests {
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
],
vec![0, 2, 3],
UnionMode::Dense,
),
DataType::Union(
Expand All @@ -585,6 +586,7 @@ mod roundtrip_tests {
true,
),
],
vec![1, 2, 3],
UnionMode::Sparse,
),
DataType::Dictionary(
Expand Down Expand Up @@ -720,6 +722,7 @@ mod roundtrip_tests {
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
],
vec![7, 5, 3],
UnionMode::Sparse,
),
DataType::Union(
Expand All @@ -737,6 +740,7 @@ mod roundtrip_tests {
true,
),
],
vec![5, 8, 1],
UnionMode::Dense,
),
DataType::Dictionary(
Expand Down
10 changes: 4 additions & 6 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,15 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
.map(|field| field.into())
.collect::<Vec<_>>(),
}),
DataType::Union(union_types, union_mode) => {
DataType::Union(union_types, type_ids, union_mode) => {
let union_mode = match union_mode {
UnionMode::Sparse => protobuf::UnionMode::Sparse,
UnionMode::Dense => protobuf::UnionMode::Dense,
};
Self::Union(protobuf::Union {
union_types: union_types
.iter()
.map(|field| field.into())
.collect::<Vec<_>>(),
union_types: union_types.iter().map(Into::into).collect(),
union_mode: union_mode.into(),
type_ids: type_ids.iter().map(|x| *x as i32).collect(),
})
}
DataType::Dictionary(key_type, value_type) => {
Expand Down Expand Up @@ -1188,7 +1186,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Union(_, _, _)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::Decimal(_, _) => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/row/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ path = "src/lib.rs"
jit = ["datafusion-jit"]

[dependencies]
arrow = { version = "14.0.0" }
arrow = { version = "15.0.0" }
datafusion-common = { path = "../common", version = "8.0.0" }
datafusion-jit = { path = "../jit", version = "8.0.0", optional = true }
paste = "^1.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ unicode_expressions = []

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "14.0.0", features = ["prettyprint"] }
arrow = { version = "15.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "8.0.0" }
datafusion-expr = { path = "../expr", version = "8.0.0" }
hashbrown = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion dev/build-arrow-ballista.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null

# clone the repo
# TODO make repo/branch configurable
git clone https://github.com/tustvold/arrow-ballista -b url-refactor
git clone https://github.com/tustvold/arrow-ballista -b arrow-15

# update dependencies to local crates
python ./dev/make-ballista-deps-local.py
Expand Down