Skip to content

Commit 91f0b17

Browse files
alambariesdevil
andauthored
feat: support reading and writingStringView and BinaryView in parquet (part 1) (#5618)
* support string and binary view read write parquet * read to view type using offset_buffer * add bench codes for view type read write * using view_buffer instead of offset_buffer for view type parquet reader * Revert "using view_buffer instead of offset_buffer for view type parquet reader" This reverts commit d286521. --------- Co-authored-by: Yijun Zhao <[email protected]>
1 parent 2c4b321 commit 91f0b17

File tree

14 files changed

+534
-47
lines changed

14 files changed

+534
-47
lines changed

arrow-array/src/array/byte_view_array.rs

+54-1
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,18 @@ impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
377377
}
378378
}
379379

380+
impl<'a, Ptr, T> FromIterator<&'a Option<Ptr>> for GenericByteViewArray<T>
381+
where
382+
Ptr: AsRef<T::Native> + 'a,
383+
T: ByteViewType + ?Sized,
384+
{
385+
fn from_iter<I: IntoIterator<Item = &'a Option<Ptr>>>(iter: I) -> Self {
386+
iter.into_iter()
387+
.map(|o| o.as_ref().map(|p| p.as_ref()))
388+
.collect()
389+
}
390+
}
391+
380392
impl<Ptr, T: ByteViewType + ?Sized> FromIterator<Option<Ptr>> for GenericByteViewArray<T>
381393
where
382394
Ptr: AsRef<T::Native>,
@@ -400,7 +412,23 @@ where
400412
/// ```
401413
pub type BinaryViewArray = GenericByteViewArray<BinaryViewType>;
402414

403-
/// A [`GenericByteViewArray`] that stores uf8 data
415+
impl BinaryViewArray {
416+
/// Convert the [`BinaryViewArray`] to [`StringViewArray`]
417+
/// If items not utf8 data, validate will fail and error returned.
418+
pub fn to_string_view(self) -> Result<StringViewArray, ArrowError> {
419+
StringViewType::validate(self.views(), self.data_buffers())?;
420+
unsafe { Ok(self.to_string_view_unchecked()) }
421+
}
422+
423+
/// Convert the [`BinaryViewArray`] to [`StringViewArray`]
424+
/// # Safety
425+
/// Caller is responsible for ensuring that items in array are utf8 data.
426+
pub unsafe fn to_string_view_unchecked(self) -> StringViewArray {
427+
StringViewArray::new_unchecked(self.views, self.buffers, self.nulls)
428+
}
429+
}
430+
431+
/// A [`GenericByteViewArray`] that stores utf8 data
404432
///
405433
/// # Example
406434
/// ```
@@ -411,12 +439,37 @@ pub type BinaryViewArray = GenericByteViewArray<BinaryViewType>;
411439
/// ```
412440
pub type StringViewArray = GenericByteViewArray<StringViewType>;
413441

442+
impl StringViewArray {
443+
/// Convert the [`StringViewArray`] to [`BinaryViewArray`]
444+
pub fn to_binary_view(self) -> BinaryViewArray {
445+
unsafe { BinaryViewArray::new_unchecked(self.views, self.buffers, self.nulls) }
446+
}
447+
}
448+
414449
impl From<Vec<&str>> for StringViewArray {
415450
fn from(v: Vec<&str>) -> Self {
416451
Self::from_iter_values(v)
417452
}
418453
}
419454

455+
impl From<Vec<Option<&str>>> for StringViewArray {
456+
fn from(v: Vec<Option<&str>>) -> Self {
457+
v.into_iter().collect()
458+
}
459+
}
460+
461+
impl From<Vec<String>> for StringViewArray {
462+
fn from(v: Vec<String>) -> Self {
463+
Self::from_iter_values(v)
464+
}
465+
}
466+
467+
impl From<Vec<Option<String>>> for StringViewArray {
468+
fn from(v: Vec<Option<String>>) -> Self {
469+
v.into_iter().collect()
470+
}
471+
}
472+
420473
#[cfg(test)]
421474
mod tests {
422475
use crate::builder::StringViewBuilder;

arrow/src/util/bench_util.rs

+36
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,42 @@ pub fn create_string_array_with_len<Offset: OffsetSizeTrait>(
119119
.collect()
120120
}
121121

122+
/// Creates a random (but fixed-seeded) array of a given size, null density and length
123+
pub fn create_string_view_array_with_len(
124+
size: usize,
125+
null_density: f32,
126+
str_len: usize,
127+
mixed: bool,
128+
) -> StringViewArray {
129+
let rng = &mut seedable_rng();
130+
131+
let mut lengths = Vec::with_capacity(size);
132+
133+
// if mixed, we creates first half that string length small than 12 bytes and second half large than 12 bytes
134+
if mixed {
135+
for _ in 0..size / 2 {
136+
lengths.push(rng.gen_range(1..12));
137+
}
138+
for _ in size / 2..size {
139+
lengths.push(rng.gen_range(12..=std::cmp::max(30, str_len)));
140+
}
141+
} else {
142+
lengths.resize(size, str_len);
143+
}
144+
145+
lengths
146+
.into_iter()
147+
.map(|len| {
148+
if rng.gen::<f32>() < null_density {
149+
None
150+
} else {
151+
let value: Vec<u8> = rng.sample_iter(&Alphanumeric).take(len).collect();
152+
Some(String::from_utf8(value).unwrap())
153+
}
154+
})
155+
.collect()
156+
}
157+
122158
/// Creates an random (but fixed-seeded) array of a given size and null density
123159
/// consisting of random 4 character alphanumeric strings
124160
pub fn create_string_dict_array<K: ArrowDictionaryKeyType>(

arrow/src/util/data_gen.rs

+9
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,18 @@ pub fn create_random_array(
121121
},
122122
Utf8 => Arc::new(create_string_array::<i32>(size, null_density)),
123123
LargeUtf8 => Arc::new(create_string_array::<i64>(size, null_density)),
124+
Utf8View => Arc::new(create_string_view_array_with_len(
125+
size,
126+
null_density,
127+
4,
128+
false,
129+
)),
124130
Binary => Arc::new(create_binary_array::<i32>(size, null_density)),
125131
LargeBinary => Arc::new(create_binary_array::<i64>(size, null_density)),
126132
FixedSizeBinary(len) => Arc::new(create_fsb_array(size, null_density, *len as usize)),
133+
BinaryView => Arc::new(
134+
create_string_view_array_with_len(size, null_density, 4, false).to_binary_view(),
135+
),
127136
List(_) => create_random_list_array(field, size, null_density, true_density)?,
128137
LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
129138
Struct(fields) => Arc::new(StructArray::try_from(

parquet/benches/arrow_reader.rs

+98-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
2323
use num::FromPrimitive;
2424
use num_bigint::BigInt;
2525
use parquet::arrow::array_reader::{
26-
make_byte_array_reader, make_fixed_len_byte_array_reader, ListArrayReader,
26+
make_byte_array_reader, make_byte_view_array_reader, make_fixed_len_byte_array_reader,
27+
ListArrayReader,
2728
};
2829
use parquet::basic::Type;
2930
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
@@ -502,6 +503,13 @@ fn create_string_byte_array_reader(
502503
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
503504
}
504505

506+
fn create_string_view_byte_array_reader(
507+
page_iterator: impl PageIterator + 'static,
508+
column_desc: ColumnDescPtr,
509+
) -> Box<dyn ArrayReader> {
510+
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
511+
}
512+
505513
fn create_string_byte_array_dictionary_reader(
506514
page_iterator: impl PageIterator + 'static,
507515
column_desc: ColumnDescPtr,
@@ -993,6 +1001,95 @@ fn add_benches(c: &mut Criterion) {
9931001

9941002
group.finish();
9951003

1004+
// string view benchmarks
1005+
//==============================
1006+
1007+
let mut group = c.benchmark_group("arrow_array_reader/StringViewArray");
1008+
1009+
// string, plain encoded, no NULLs
1010+
let plain_string_no_null_data =
1011+
build_plain_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0);
1012+
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
1013+
b.iter(|| {
1014+
let array_reader = create_string_view_byte_array_reader(
1015+
plain_string_no_null_data.clone(),
1016+
mandatory_string_column_desc.clone(),
1017+
);
1018+
count = bench_array_reader(array_reader);
1019+
});
1020+
assert_eq!(count, EXPECTED_VALUE_COUNT);
1021+
});
1022+
1023+
let plain_string_no_null_data =
1024+
build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0);
1025+
group.bench_function("plain encoded, optional, no NULLs", |b| {
1026+
b.iter(|| {
1027+
let array_reader = create_string_view_byte_array_reader(
1028+
plain_string_no_null_data.clone(),
1029+
optional_string_column_desc.clone(),
1030+
);
1031+
count = bench_array_reader(array_reader);
1032+
});
1033+
assert_eq!(count, EXPECTED_VALUE_COUNT);
1034+
});
1035+
1036+
// string, plain encoded, half NULLs
1037+
let plain_string_half_null_data =
1038+
build_plain_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5);
1039+
group.bench_function("plain encoded, optional, half NULLs", |b| {
1040+
b.iter(|| {
1041+
let array_reader = create_string_view_byte_array_reader(
1042+
plain_string_half_null_data.clone(),
1043+
optional_string_column_desc.clone(),
1044+
);
1045+
count = bench_array_reader(array_reader);
1046+
});
1047+
assert_eq!(count, EXPECTED_VALUE_COUNT);
1048+
});
1049+
1050+
// string, dictionary encoded, no NULLs
1051+
let dictionary_string_no_null_data =
1052+
build_dictionary_encoded_string_page_iterator(mandatory_string_column_desc.clone(), 0.0);
1053+
group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
1054+
b.iter(|| {
1055+
let array_reader = create_string_view_byte_array_reader(
1056+
dictionary_string_no_null_data.clone(),
1057+
mandatory_string_column_desc.clone(),
1058+
);
1059+
count = bench_array_reader(array_reader);
1060+
});
1061+
assert_eq!(count, EXPECTED_VALUE_COUNT);
1062+
});
1063+
1064+
let dictionary_string_no_null_data =
1065+
build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.0);
1066+
group.bench_function("dictionary encoded, optional, no NULLs", |b| {
1067+
b.iter(|| {
1068+
let array_reader = create_string_view_byte_array_reader(
1069+
dictionary_string_no_null_data.clone(),
1070+
optional_string_column_desc.clone(),
1071+
);
1072+
count = bench_array_reader(array_reader);
1073+
});
1074+
assert_eq!(count, EXPECTED_VALUE_COUNT);
1075+
});
1076+
1077+
// string, dictionary encoded, half NULLs
1078+
let dictionary_string_half_null_data =
1079+
build_dictionary_encoded_string_page_iterator(optional_string_column_desc.clone(), 0.5);
1080+
group.bench_function("dictionary encoded, optional, half NULLs", |b| {
1081+
b.iter(|| {
1082+
let array_reader = create_string_view_byte_array_reader(
1083+
dictionary_string_half_null_data.clone(),
1084+
optional_string_column_desc.clone(),
1085+
);
1086+
count = bench_array_reader(array_reader);
1087+
});
1088+
assert_eq!(count, EXPECTED_VALUE_COUNT);
1089+
});
1090+
1091+
group.finish();
1092+
9961093
// list benchmarks
9971094
//==============================
9981095

parquet/benches/arrow_writer.rs

+34
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,24 @@ fn create_string_bench_batch(
9696
)?)
9797
}
9898

99+
fn create_string_and_binary_view_bench_batch(
100+
size: usize,
101+
null_density: f32,
102+
true_density: f32,
103+
) -> Result<RecordBatch> {
104+
let fields = vec![
105+
Field::new("_1", DataType::Utf8View, true),
106+
Field::new("_2", DataType::BinaryView, true),
107+
];
108+
let schema = Schema::new(fields);
109+
Ok(create_random_batch(
110+
Arc::new(schema),
111+
size,
112+
null_density,
113+
true_density,
114+
)?)
115+
}
116+
99117
fn create_string_dictionary_bench_batch(
100118
size: usize,
101119
null_density: f32,
@@ -395,6 +413,22 @@ fn bench_primitive_writer(c: &mut Criterion) {
395413
b.iter(|| write_batch_enable_bloom_filter(&batch).unwrap())
396414
});
397415

416+
let batch = create_string_and_binary_view_bench_batch(4096, 0.25, 0.75).unwrap();
417+
group.throughput(Throughput::Bytes(
418+
batch
419+
.columns()
420+
.iter()
421+
.map(|f| f.get_array_memory_size() as u64)
422+
.sum(),
423+
));
424+
group.bench_function("4096 values string", |b| {
425+
b.iter(|| write_batch(&batch).unwrap())
426+
});
427+
428+
group.bench_function("4096 values string with bloom filter", |b| {
429+
b.iter(|| write_batch_enable_bloom_filter(&batch).unwrap())
430+
});
431+
398432
let batch = create_string_dictionary_bench_batch(4096, 0.25, 0.75).unwrap();
399433
group.throughput(Throughput::Bytes(
400434
batch

parquet/src/arrow/array_reader/builder.rs

+12-16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919

2020
use arrow_schema::{DataType, Fields, SchemaBuilder};
2121

22+
use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
2223
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2324
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
2425
use crate::arrow::array_reader::{
@@ -29,9 +30,7 @@ use crate::arrow::array_reader::{
2930
use crate::arrow::schema::{ParquetField, ParquetFieldType};
3031
use crate::arrow::ProjectionMask;
3132
use crate::basic::Type as PhysicalType;
32-
use crate::data_type::{
33-
BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type,
34-
};
33+
use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
3534
use crate::errors::{ParquetError, Result};
3635
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
3736

@@ -55,17 +54,13 @@ fn build_reader(
5554
row_groups: &dyn RowGroups,
5655
) -> Result<Option<Box<dyn ArrayReader>>> {
5756
match field.field_type {
58-
ParquetFieldType::Primitive { .. } => {
59-
build_primitive_reader(field, mask, row_groups)
60-
}
57+
ParquetFieldType::Primitive { .. } => build_primitive_reader(field, mask, row_groups),
6158
ParquetFieldType::Group { .. } => match &field.arrow_type {
6259
DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
6360
DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
6461
DataType::List(_) => build_list_reader(field, mask, false, row_groups),
6562
DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
66-
DataType::FixedSizeList(_, _) => {
67-
build_fixed_size_list_reader(field, mask, row_groups)
68-
}
63+
DataType::FixedSizeList(_, _) => build_fixed_size_list_reader(field, mask, row_groups),
6964
d => unimplemented!("reading group type {} not implemented", d),
7065
},
7166
}
@@ -140,9 +135,9 @@ fn build_list_reader(
140135
DataType::List(f) => {
141136
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
142137
}
143-
DataType::LargeList(f) => DataType::LargeList(Arc::new(
144-
f.as_ref().clone().with_data_type(item_type),
145-
)),
138+
DataType::LargeList(f) => {
139+
DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
140+
}
146141
_ => unreachable!(),
147142
};
148143

@@ -289,6 +284,9 @@ fn build_primitive_reader(
289284
Some(DataType::Dictionary(_, _)) => {
290285
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
291286
}
287+
Some(DataType::Utf8View | DataType::BinaryView) => {
288+
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
289+
}
292290
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
293291
},
294292
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
@@ -347,8 +345,7 @@ mod tests {
347345
#[test]
348346
fn test_create_array_reader() {
349347
let file = get_test_file("nulls.snappy.parquet");
350-
let file_reader: Arc<dyn FileReader> =
351-
Arc::new(SerializedFileReader::new(file).unwrap());
348+
let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
352349

353350
let file_metadata = file_reader.metadata().file_metadata();
354351
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
@@ -359,8 +356,7 @@ mod tests {
359356
)
360357
.unwrap();
361358

362-
let array_reader =
363-
build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
359+
let array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
364360

365361
// Create arrow types
366362
let arrow_type = DataType::Struct(Fields::from(vec![Field::new(

0 commit comments

Comments
 (0)