Skip to content

Commit a591301

Browse files
alambXiangpengHaoa10y
authored
Merge string-view2 branch: reading from parquet up to 2x faster for some ClickBench queries (not on by default) (#11667)
* Pin to pre-release version of arrow 52.2.0 * Update for deprecated method * Add a config to force using string view in benchmark (#11514) * add a knob to force string view in benchmark * fix sql logic test * update doc * fix ci * fix ci only test * Update benchmarks/src/util/options.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/common/src/config.rs Co-authored-by: Andrew Lamb <[email protected]> * update tests --------- Co-authored-by: Andrew Lamb <[email protected]> * Add String view helper functions (#11517) * add functions * add tests for hash util * Add ArrowBytesViewMap and ArrowBytesViewSet (#11515) * Update `string-view` branch to arrow-rs main (#10966) * Pin to arrow main * Fix clippy with latest arrow * Uncomment test that needs new arrow-rs to work * Update datafusion-cli Cargo.lock * Update Cargo.lock * tapelo * merge * update cast * consistent dep * fix ci * add more tests * make doc happy * update new implementation * fix bug * avoid unused dep * update dep * update * fix cargo check * update doc * pick up the comments change again --------- Co-authored-by: Andrew Lamb <[email protected]> * Enable `GroupValueBytesView` for aggregation with StringView types (#11519) * add functions * Update `string-view` branch to arrow-rs main (#10966) * Pin to arrow main * Fix clippy with latest arrow * Uncomment test that needs new arrow-rs to work * Update datafusion-cli Cargo.lock * Update Cargo.lock * tapelo * merge * update cast * consistent dep * fix ci * avoid unused dep * update dep * update * fix cargo check * better group value view aggregation * update --------- Co-authored-by: Andrew Lamb <[email protected]> * Initial support for regex_replace on `StringViewArray` (#11556) * initial support for string view regex * update tests * Add support for Utf8View for date/temporal codepaths (#11518) * Add StringView support for date_part and make_date funcs * run cargo update in datafusion-cli * cargo fmt --------- Co-authored-by: Andrew Lamb <[email protected]> * GC `StringViewArray` in `CoalesceBatchesStream` (#11587) * gc string view when appropriate * make clippy happy * address comments * make doc happy * update style * Add comments and tests for gc_string_view_batch * better herustic * update test * Update datafusion/physical-plan/src/coalesce_batches.rs Co-authored-by: Andrew Lamb <[email protected]> --------- Co-authored-by: Andrew Lamb <[email protected]> * [Bug] fix bug in return type inference of `utf8_to_int_type` (#11662) * fix bug in return type inference * update doc * add tests --------- Co-authored-by: Andrew Lamb <[email protected]> * Fix clippy * Increase ByteViewMap block size to 2MB (#11674) * better default block size * fix related test * Change `--string-view` to only apply to parquet formats (#11663) * use inferenced schema, don't load schema again * move config to parquet-only * update * update * better format * format * update * Implement native support StringView for character length (#11676) * native support for character length * Update datafusion/functions/src/unicode/character_length.rs --------- Co-authored-by: Andrew Lamb <[email protected]> * Remove uneeded patches * cargo fmt --------- Co-authored-by: Xiangpeng Hao <[email protected]> Co-authored-by: Xiangpeng Hao <[email protected]> Co-authored-by: Andrew Duffy <[email protected]>
1 parent ea8c287 commit a591301

File tree

40 files changed

+1714
-192
lines changed

40 files changed

+1714
-192
lines changed

benchmarks/src/clickbench.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,13 @@ impl RunOpt {
116116
None => queries.min_query_id()..=queries.max_query_id(),
117117
};
118118

119-
let config = self.common.config();
119+
let mut config = self.common.config();
120+
config
121+
.options_mut()
122+
.execution
123+
.parquet
124+
.schema_force_string_view = self.common.string_view;
125+
120126
let ctx = SessionContext::new_with_config(config);
121127
self.register_hits(&ctx).await?;
122128

benchmarks/src/tpch/run.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ impl RunOpt {
120120
.config()
121121
.with_collect_statistics(!self.disable_statistics);
122122
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
123+
config
124+
.options_mut()
125+
.execution
126+
.parquet
127+
.schema_force_string_view = self.common.string_view;
123128
let ctx = SessionContext::new_with_config(config);
124129

125130
// register tables
@@ -339,6 +344,7 @@ mod tests {
339344
partitions: Some(2),
340345
batch_size: 8192,
341346
debug: false,
347+
string_view: false,
342348
};
343349
let opt = RunOpt {
344350
query: Some(query),
@@ -372,6 +378,7 @@ mod tests {
372378
partitions: Some(2),
373379
batch_size: 8192,
374380
debug: false,
381+
string_view: false,
375382
};
376383
let opt = RunOpt {
377384
query: Some(query),

benchmarks/src/util/options.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ pub struct CommonOpt {
3737
/// Activate debug mode to see more details
3838
#[structopt(short, long)]
3939
pub debug: bool,
40+
41+
/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
42+
/// when reading ParquetFiles
43+
#[structopt(long)]
44+
pub string_view: bool,
4045
}
4146

4247
impl CommonOpt {

datafusion-cli/Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/cast.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use arrow::{
3636
},
3737
datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType},
3838
};
39+
use arrow_array::{BinaryViewArray, StringViewArray};
3940

4041
// Downcast ArrayRef to Date32Array
4142
pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array> {
@@ -87,6 +88,11 @@ pub fn as_string_array(array: &dyn Array) -> Result<&StringArray> {
8788
Ok(downcast_value!(array, StringArray))
8889
}
8990

91+
// Downcast ArrayRef to StringViewArray
92+
pub fn as_string_view_array(array: &dyn Array) -> Result<&StringViewArray> {
93+
Ok(downcast_value!(array, StringViewArray))
94+
}
95+
9096
// Downcast ArrayRef to UInt32Array
9197
pub fn as_uint32_array(array: &dyn Array) -> Result<&UInt32Array> {
9298
Ok(downcast_value!(array, UInt32Array))
@@ -221,6 +227,11 @@ pub fn as_binary_array(array: &dyn Array) -> Result<&BinaryArray> {
221227
Ok(downcast_value!(array, BinaryArray))
222228
}
223229

230+
// Downcast ArrayRef to BinaryViewArray
231+
pub fn as_binary_view_array(array: &dyn Array) -> Result<&BinaryViewArray> {
232+
Ok(downcast_value!(array, BinaryViewArray))
233+
}
234+
224235
// Downcast ArrayRef to FixedSizeListArray
225236
pub fn as_fixed_size_list_array(array: &dyn Array) -> Result<&FixedSizeListArray> {
226237
Ok(downcast_value!(array, FixedSizeListArray))

datafusion/common/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,10 @@ config_namespace! {
469469
/// writing out already in-memory data, such as from a cached
470470
/// data frame.
471471
pub maximum_buffered_record_batches_per_stream: usize, default = 2
472+
473+
/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
474+
/// and `Binary/BinaryLarge` with `BinaryView`.
475+
pub schema_force_string_view: bool, default = false
472476
}
473477
}
474478

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ impl ParquetOptions {
175175
maximum_parallel_row_group_writers: _,
176176
maximum_buffered_record_batches_per_stream: _,
177177
bloom_filter_on_read: _, // reads not used for writer props
178+
schema_force_string_view: _,
178179
} = self;
179180

180181
let mut builder = WriterProperties::builder()
@@ -440,6 +441,7 @@ mod tests {
440441
maximum_buffered_record_batches_per_stream: defaults
441442
.maximum_buffered_record_batches_per_stream,
442443
bloom_filter_on_read: defaults.bloom_filter_on_read,
444+
schema_force_string_view: defaults.schema_force_string_view,
443445
}
444446
}
445447

@@ -540,6 +542,8 @@ mod tests {
540542
maximum_buffered_record_batches_per_stream: global_options_defaults
541543
.maximum_buffered_record_batches_per_stream,
542544
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
545+
schema_force_string_view: global_options_defaults
546+
.schema_force_string_view,
543547
},
544548
column_specific_options,
545549
key_value_metadata,

datafusion/common/src/hash_utils.rs

Lines changed: 110 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ use arrow_buffer::IntervalMonthDayNano;
3131

3232
#[cfg(not(feature = "force_hash_collisions"))]
3333
use crate::cast::{
34-
as_boolean_array, as_fixed_size_list_array, as_generic_binary_array,
35-
as_large_list_array, as_list_array, as_map_array, as_primitive_array,
36-
as_string_array, as_struct_array,
34+
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
35+
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
36+
as_primitive_array, as_string_array, as_string_view_array, as_struct_array,
3737
};
3838
use crate::error::Result;
3939
#[cfg(not(feature = "force_hash_collisions"))]
@@ -415,8 +415,10 @@ pub fn create_hashes<'a>(
415415
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
416416
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
417417
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
418+
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
418419
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
419420
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
421+
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
420422
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
421423
DataType::FixedSizeBinary(_) => {
422424
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
@@ -540,22 +542,57 @@ mod tests {
540542
Ok(())
541543
}
542544

543-
#[test]
544-
fn create_hashes_binary() -> Result<()> {
545-
let byte_array = Arc::new(BinaryArray::from_vec(vec![
546-
&[4, 3, 2],
547-
&[4, 3, 2],
548-
&[1, 2, 3],
549-
]));
545+
macro_rules! create_hash_binary {
546+
($NAME:ident, $ARRAY:ty) => {
547+
#[cfg(not(feature = "force_hash_collisions"))]
548+
#[test]
549+
fn $NAME() {
550+
let binary = [
551+
Some(b"short".to_byte_slice()),
552+
None,
553+
Some(b"long but different 12 bytes string"),
554+
Some(b"short2"),
555+
Some(b"Longer than 12 bytes string"),
556+
Some(b"short"),
557+
Some(b"Longer than 12 bytes string"),
558+
];
559+
560+
let binary_array = Arc::new(binary.iter().cloned().collect::<$ARRAY>());
561+
let ref_array = Arc::new(binary.iter().cloned().collect::<BinaryArray>());
562+
563+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
564+
565+
let mut binary_hashes = vec![0; binary.len()];
566+
create_hashes(&[binary_array], &random_state, &mut binary_hashes)
567+
.unwrap();
568+
569+
let mut ref_hashes = vec![0; binary.len()];
570+
create_hashes(&[ref_array], &random_state, &mut ref_hashes).unwrap();
571+
572+
// Null values result in a zero hash,
573+
for (val, hash) in binary.iter().zip(binary_hashes.iter()) {
574+
match val {
575+
Some(_) => assert_ne!(*hash, 0),
576+
None => assert_eq!(*hash, 0),
577+
}
578+
}
550579

551-
let random_state = RandomState::with_seeds(0, 0, 0, 0);
552-
let hashes_buff = &mut vec![0; byte_array.len()];
553-
let hashes = create_hashes(&[byte_array], &random_state, hashes_buff)?;
554-
assert_eq!(hashes.len(), 3,);
580+
// same logical values should hash to the same hash value
581+
assert_eq!(binary_hashes, ref_hashes);
555582

556-
Ok(())
583+
// Same values should map to same hash values
584+
assert_eq!(binary[0], binary[5]);
585+
assert_eq!(binary[4], binary[6]);
586+
587+
// different binary should map to different hash values
588+
assert_ne!(binary[0], binary[2]);
589+
}
590+
};
557591
}
558592

593+
create_hash_binary!(binary_array, BinaryArray);
594+
create_hash_binary!(binary_view_array, BinaryViewArray);
595+
559596
#[test]
560597
fn create_hashes_fixed_size_binary() -> Result<()> {
561598
let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
@@ -571,6 +608,64 @@ mod tests {
571608
Ok(())
572609
}
573610

611+
macro_rules! create_hash_string {
612+
($NAME:ident, $ARRAY:ty) => {
613+
#[cfg(not(feature = "force_hash_collisions"))]
614+
#[test]
615+
fn $NAME() {
616+
let strings = [
617+
Some("short"),
618+
None,
619+
Some("long but different 12 bytes string"),
620+
Some("short2"),
621+
Some("Longer than 12 bytes string"),
622+
Some("short"),
623+
Some("Longer than 12 bytes string"),
624+
];
625+
626+
let string_array = Arc::new(strings.iter().cloned().collect::<$ARRAY>());
627+
let dict_array = Arc::new(
628+
strings
629+
.iter()
630+
.cloned()
631+
.collect::<DictionaryArray<Int8Type>>(),
632+
);
633+
634+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
635+
636+
let mut string_hashes = vec![0; strings.len()];
637+
create_hashes(&[string_array], &random_state, &mut string_hashes)
638+
.unwrap();
639+
640+
let mut dict_hashes = vec![0; strings.len()];
641+
create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();
642+
643+
// Null values result in a zero hash,
644+
for (val, hash) in strings.iter().zip(string_hashes.iter()) {
645+
match val {
646+
Some(_) => assert_ne!(*hash, 0),
647+
None => assert_eq!(*hash, 0),
648+
}
649+
}
650+
651+
// same logical values should hash to the same hash value
652+
assert_eq!(string_hashes, dict_hashes);
653+
654+
// Same values should map to same hash values
655+
assert_eq!(strings[0], strings[5]);
656+
assert_eq!(strings[4], strings[6]);
657+
658+
// different strings should map to different hash values
659+
assert_ne!(strings[0], strings[2]);
660+
}
661+
};
662+
}
663+
664+
create_hash_string!(string_array, StringArray);
665+
create_hash_string!(large_string_array, LargeStringArray);
666+
create_hash_string!(string_view_array, StringArray);
667+
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
668+
574669
#[test]
575670
// Tests actual values of hashes, which are different if forcing collisions
576671
#[cfg(not(feature = "force_hash_collisions"))]

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::error::Result;
4242
use crate::execution::context::SessionState;
4343
use crate::physical_plan::{ExecutionPlan, Statistics};
4444

45+
use arrow_schema::{DataType, Field, Schema};
4546
use datafusion_common::file_options::file_type::FileType;
4647
use datafusion_common::{internal_err, not_impl_err, GetExt};
4748
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
@@ -204,6 +205,28 @@ pub fn file_type_to_format(
204205
}
205206
}
206207

208+
/// Transform a schema to use view types for Utf8 and Binary
209+
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
210+
let transformed_fields: Vec<Arc<Field>> = schema
211+
.fields
212+
.iter()
213+
.map(|field| match field.data_type() {
214+
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
215+
field.name(),
216+
DataType::Utf8View,
217+
field.is_nullable(),
218+
)),
219+
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
220+
field.name(),
221+
DataType::BinaryView,
222+
field.is_nullable(),
223+
)),
224+
_ => field.clone(),
225+
})
226+
.collect();
227+
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
228+
}
229+
207230
#[cfg(test)]
208231
pub(crate) mod test_util {
209232
use std::ops::Range;

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424

2525
use super::write::demux::start_demuxer_task;
2626
use super::write::{create_writer, SharedBuffer};
27-
use super::{FileFormat, FileFormatFactory, FileScanConfig};
27+
use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig};
2828
use crate::arrow::array::RecordBatch;
2929
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
3030
use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -316,6 +316,17 @@ impl FileFormat for ParquetFormat {
316316
Schema::try_merge(schemas)
317317
}?;
318318

319+
let schema = if state
320+
.config_options()
321+
.execution
322+
.parquet
323+
.schema_force_string_view
324+
{
325+
transform_schema_to_view(&schema)
326+
} else {
327+
schema
328+
};
329+
319330
Ok(Arc::new(schema))
320331
}
321332

datafusion/core/src/datasource/listing/table.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,9 @@ impl ListingOptions {
410410
.try_collect()
411411
.await?;
412412

413-
self.format.infer_schema(state, &store, &files).await
413+
let schema = self.format.infer_schema(state, &store, &files).await?;
414+
415+
Ok(schema)
414416
}
415417

416418
/// Infers the partition columns stored in `LOCATION` and compares

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,10 @@ impl ExecutionPlan for ParquetExec {
711711
enable_page_index: self.enable_page_index(),
712712
enable_bloom_filter: self.bloom_filter_on_read(),
713713
schema_adapter_factory,
714+
schema_force_string_view: self
715+
.table_parquet_options
716+
.global
717+
.schema_force_string_view,
714718
};
715719

716720
let stream =

0 commit comments

Comments
 (0)