Skip to content

Commit 2a5f22f

Browse files
committed
refactor: user may choose to dict-encode partition values
Let the user decide if they may want to encode partition values for file-based data sources. Dictionary encoding makes sense for string values but is probably pointless or even counterproductive for integer types.
1 parent 8c34ca4 commit 2a5f22f

File tree

8 files changed

+221
-79
lines changed

8 files changed

+221
-79
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ use crate::datasource::{
4444
};
4545
use crate::logical_expr::TableProviderFilterPushDown;
4646
use crate::physical_plan;
47-
use crate::physical_plan::file_format::partition_type_wrap;
4847
use crate::{
4948
error::{DataFusionError, Result},
5049
execution::context::SessionState,
@@ -538,11 +537,7 @@ impl ListingTable {
538537
// Add the partition columns to the file schema
539538
let mut table_fields = file_schema.fields().clone();
540539
for (part_col_name, part_col_type) in &options.table_partition_cols {
541-
table_fields.push(Field::new(
542-
part_col_name,
543-
partition_type_wrap(part_col_type.clone()),
544-
false,
545-
));
540+
table_fields.push(Field::new(part_col_name, part_col_type.clone(), false));
546541
}
547542
let infinite_source = options.infinite_source;
548543

@@ -1012,10 +1007,7 @@ mod tests {
10121007

10131008
let opt = ListingOptions::new(Arc::new(AvroFormat {}))
10141009
.with_file_extension(FileType::AVRO.get_ext())
1015-
.with_table_partition_cols(vec![(
1016-
String::from("p1"),
1017-
partition_type_wrap(DataType::Utf8),
1018-
)])
1010+
.with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
10191011
.with_target_partitions(4);
10201012

10211013
let table_path = ListingTableUrl::parse("test:///table/").unwrap();

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,15 @@ impl TableProviderFactory for ListingTableFactory {
8686
None,
8787
cmd.table_partition_cols
8888
.iter()
89-
.map(|x| (x.clone(), DataType::Utf8))
89+
.map(|x| {
90+
(
91+
x.clone(),
92+
DataType::Dictionary(
93+
Box::new(DataType::UInt16),
94+
Box::new(DataType::Utf8),
95+
),
96+
)
97+
})
9098
.collect::<Vec<_>>(),
9199
)
92100
} else {

datafusion/core/src/physical_plan/file_format/avro.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ mod tests {
213213
use crate::datasource::listing::PartitionedFile;
214214
use crate::datasource::object_store::ObjectStoreUrl;
215215
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
216-
use crate::physical_plan::file_format::partition_type_wrap;
217216
use crate::prelude::SessionContext;
218217
use crate::scalar::ScalarValue;
219218
use crate::test::object_store::local_unpartitioned_file;
@@ -409,10 +408,7 @@ mod tests {
409408
file_schema,
410409
statistics: Statistics::default(),
411410
limit: None,
412-
table_partition_cols: vec![(
413-
"date".to_owned(),
414-
partition_type_wrap(DataType::Utf8),
415-
)],
411+
table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
416412
output_ordering: None,
417413
infinite_source: false,
418414
});

datafusion/core/src/physical_plan/file_format/csv.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ mod tests {
326326
use super::*;
327327
use crate::datasource::file_format::file_type::FileType;
328328
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
329-
use crate::physical_plan::file_format::partition_type_wrap;
330329
use crate::prelude::*;
331330
use crate::test::{partitioned_csv_config, partitioned_file_groups};
332331
use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
@@ -580,8 +579,7 @@ mod tests {
580579
let mut config = partitioned_csv_config(file_schema, file_groups)?;
581580

582581
// Add partition columns
583-
config.table_partition_cols =
584-
vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))];
582+
config.table_partition_cols = vec![("date".to_owned(), DataType::Utf8)];
585583
config.file_groups[0][0].partition_values =
586584
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
587585

datafusion/core/src/physical_plan/file_format/mod.rs

Lines changed: 155 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ pub use self::csv::CsvExec;
3030
pub(crate) use self::parquet::plan_to_parquet;
3131
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
3232
use arrow::{
33-
array::{ArrayData, ArrayRef, DictionaryArray},
33+
array::{ArrayData, ArrayRef, BufferBuilder, DictionaryArray},
3434
buffer::Buffer,
35-
datatypes::{DataType, Field, Schema, SchemaRef, UInt16Type},
35+
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
3636
record_batch::RecordBatch,
3737
};
3838
pub use avro::AvroExec;
@@ -49,25 +49,21 @@ use crate::{
4949
error::{DataFusionError, Result},
5050
scalar::ScalarValue,
5151
};
52-
use arrow::array::{new_null_array, UInt16BufferBuilder};
52+
use arrow::array::new_null_array;
5353
use arrow::record_batch::RecordBatchOptions;
5454
use log::{debug, info};
5555
use object_store::path::Path;
5656
use object_store::ObjectMeta;
5757
use std::{
5858
collections::HashMap,
5959
fmt::{Display, Formatter, Result as FmtResult},
60+
marker::PhantomData,
6061
sync::Arc,
6162
vec,
6263
};
6364

6465
use super::{ColumnStatistics, Statistics};
6566

66-
/// Convert logical type of partition column to physical type: `Dictionary(UInt16, val_type)`
67-
pub fn partition_type_wrap(val_type: DataType) -> DataType {
68-
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
69-
}
70-
7167
/// The base configurations to provide when creating a physical plan for
7268
/// any given file format.
7369
#[derive(Debug, Clone)]
@@ -346,7 +342,7 @@ struct PartitionColumnProjector {
346342
/// An Arrow buffer initialized to zeros that represents the key array of all partition
347343
/// columns (partition columns are materialized by dictionary arrays with only one
348344
/// value in the dictionary, thus all the keys are equal to zero).
349-
key_buffer_cache: Option<Buffer>,
345+
key_buffer_cache: ZeroBufferGenerators,
350346
/// Mapping between the indexes in the list of partition columns and the target
351347
/// schema. Sorted by index in the target schema so that we can iterate on it to
352348
/// insert the partition columns in the target record batch.
@@ -372,7 +368,7 @@ impl PartitionColumnProjector {
372368

373369
Self {
374370
projected_partition_indexes,
375-
key_buffer_cache: None,
371+
key_buffer_cache: Default::default(),
376372
projected_schema,
377373
}
378374
}
@@ -400,7 +396,7 @@ impl PartitionColumnProjector {
400396
for &(pidx, sidx) in &self.projected_partition_indexes {
401397
cols.insert(
402398
sidx,
403-
create_dict_array(
399+
create_output_array(
404400
&mut self.key_buffer_cache,
405401
&partition_values[pidx],
406402
file_batch.num_rows(),
@@ -411,26 +407,60 @@ impl PartitionColumnProjector {
411407
}
412408
}
413409

414-
fn create_dict_array(
415-
key_buffer_cache: &mut Option<Buffer>,
416-
val: &ScalarValue,
417-
len: usize,
418-
) -> ArrayRef {
419-
// build value dictionary
420-
let dict_vals = val.to_array();
421-
422-
// build keys array
423-
let sliced_key_buffer = match key_buffer_cache {
424-
Some(buf) if buf.len() >= len * 2 => buf.slice(buf.len() - len * 2),
425-
_ => {
426-
let mut key_buffer_builder = UInt16BufferBuilder::new(len * 2);
427-
key_buffer_builder.advance(len * 2); // keys are all 0
428-
key_buffer_cache.insert(key_buffer_builder.finish()).clone()
410+
#[derive(Debug, Default)]
411+
struct ZeroBufferGenerators {
412+
gen_i8: ZeroBufferGenerator<i8>,
413+
gen_i16: ZeroBufferGenerator<i16>,
414+
gen_i32: ZeroBufferGenerator<i32>,
415+
gen_i64: ZeroBufferGenerator<i64>,
416+
gen_u8: ZeroBufferGenerator<u8>,
417+
gen_u16: ZeroBufferGenerator<u16>,
418+
gen_u32: ZeroBufferGenerator<u32>,
419+
gen_u64: ZeroBufferGenerator<u64>,
420+
}
421+
422+
/// Generate a arrow [`Buffer`] that contains zero values.
423+
#[derive(Debug, Default)]
424+
struct ZeroBufferGenerator<T>
425+
where
426+
T: ArrowNativeType,
427+
{
428+
cache: Option<Buffer>,
429+
_t: PhantomData<T>,
430+
}
431+
432+
impl<T> ZeroBufferGenerator<T>
433+
where
434+
T: ArrowNativeType,
435+
{
436+
const SIZE: usize = std::mem::size_of::<T>();
437+
438+
fn get_buffer(&mut self, n_vals: usize) -> Buffer {
439+
match &mut self.cache {
440+
Some(buf) if buf.len() >= n_vals * Self::SIZE => {
441+
buf.slice_with_length(0, n_vals * Self::SIZE)
442+
}
443+
_ => {
444+
let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
445+
key_buffer_builder.advance(n_vals); // keys are all 0
446+
self.cache.insert(key_buffer_builder.finish()).clone()
447+
}
429448
}
430-
};
449+
}
450+
}
431451

432-
// create data type
433-
let data_type = partition_type_wrap(val.get_datatype());
452+
fn create_dict_array<T>(
453+
buffer_gen: &mut ZeroBufferGenerator<T>,
454+
dict_val: &ScalarValue,
455+
len: usize,
456+
data_type: DataType,
457+
) -> ArrayRef
458+
where
459+
T: ArrowNativeType,
460+
{
461+
let dict_vals = dict_val.to_array();
462+
463+
let sliced_key_buffer = buffer_gen.get_buffer(len);
434464

435465
// assemble pieces together
436466
let mut builder = ArrayData::builder(data_type)
@@ -442,6 +472,84 @@ fn create_dict_array(
442472
))
443473
}
444474

475+
fn create_output_array(
476+
key_buffer_cache: &mut ZeroBufferGenerators,
477+
val: &ScalarValue,
478+
len: usize,
479+
) -> ArrayRef {
480+
if let ScalarValue::Dictionary(key_type, dict_val) = &val {
481+
match key_type.as_ref() {
482+
DataType::Int8 => {
483+
return create_dict_array(
484+
&mut key_buffer_cache.gen_i8,
485+
dict_val,
486+
len,
487+
val.get_datatype(),
488+
);
489+
}
490+
DataType::Int16 => {
491+
return create_dict_array(
492+
&mut key_buffer_cache.gen_i16,
493+
dict_val,
494+
len,
495+
val.get_datatype(),
496+
);
497+
}
498+
DataType::Int32 => {
499+
return create_dict_array(
500+
&mut key_buffer_cache.gen_i32,
501+
dict_val,
502+
len,
503+
val.get_datatype(),
504+
);
505+
}
506+
DataType::Int64 => {
507+
return create_dict_array(
508+
&mut key_buffer_cache.gen_i64,
509+
dict_val,
510+
len,
511+
val.get_datatype(),
512+
);
513+
}
514+
DataType::UInt8 => {
515+
return create_dict_array(
516+
&mut key_buffer_cache.gen_u8,
517+
dict_val,
518+
len,
519+
val.get_datatype(),
520+
);
521+
}
522+
DataType::UInt16 => {
523+
return create_dict_array(
524+
&mut key_buffer_cache.gen_u16,
525+
dict_val,
526+
len,
527+
val.get_datatype(),
528+
);
529+
}
530+
DataType::UInt32 => {
531+
return create_dict_array(
532+
&mut key_buffer_cache.gen_u32,
533+
dict_val,
534+
len,
535+
val.get_datatype(),
536+
);
537+
}
538+
DataType::UInt64 => {
539+
return create_dict_array(
540+
&mut key_buffer_cache.gen_u64,
541+
dict_val,
542+
len,
543+
val.get_datatype(),
544+
);
545+
}
546+
_ => {}
547+
}
548+
}
549+
550+
val.to_array_of_size(len)
551+
}
552+
445553
/// A single file or part of a file that should be read, along with its schema, statistics
446554
pub struct FileMeta {
447555
/// Path for the file (e.g. URL, filesystem path, etc)
@@ -670,9 +778,9 @@ mod tests {
670778
// file_batch is ok here because we kept all the file cols in the projection
671779
file_batch,
672780
&[
673-
ScalarValue::Utf8(Some("2021".to_owned())),
674-
ScalarValue::Utf8(Some("10".to_owned())),
675-
ScalarValue::Utf8(Some("26".to_owned())),
781+
partition_value_wrap(ScalarValue::Utf8(Some("2021".to_owned()))),
782+
partition_value_wrap(ScalarValue::Utf8(Some("10".to_owned()))),
783+
partition_value_wrap(ScalarValue::Utf8(Some("26".to_owned()))),
676784
],
677785
)
678786
.expect("Projection of partition columns into record batch failed");
@@ -698,9 +806,9 @@ mod tests {
698806
// file_batch is ok here because we kept all the file cols in the projection
699807
file_batch,
700808
&[
701-
ScalarValue::Utf8(Some("2021".to_owned())),
702-
ScalarValue::Utf8(Some("10".to_owned())),
703-
ScalarValue::Utf8(Some("27".to_owned())),
809+
partition_value_wrap(ScalarValue::Utf8(Some("2021".to_owned()))),
810+
partition_value_wrap(ScalarValue::Utf8(Some("10".to_owned()))),
811+
partition_value_wrap(ScalarValue::Utf8(Some("27".to_owned()))),
704812
],
705813
)
706814
.expect("Projection of partition columns into record batch failed");
@@ -728,9 +836,9 @@ mod tests {
728836
// file_batch is ok here because we kept all the file cols in the projection
729837
file_batch,
730838
&[
731-
ScalarValue::Utf8(Some("2021".to_owned())),
732-
ScalarValue::Utf8(Some("10".to_owned())),
733-
ScalarValue::Utf8(Some("28".to_owned())),
839+
partition_value_wrap(ScalarValue::Utf8(Some("2021".to_owned()))),
840+
partition_value_wrap(ScalarValue::Utf8(Some("10".to_owned()))),
841+
partition_value_wrap(ScalarValue::Utf8(Some("28".to_owned()))),
734842
],
735843
)
736844
.expect("Projection of partition columns into record batch failed");
@@ -862,4 +970,13 @@ mod tests {
862970
extensions: None,
863971
}
864972
}
973+
974+
/// Convert logical type of partition column to physical type: `Dictionary(UInt16, val_type)`
975+
fn partition_type_wrap(val_type: DataType) -> DataType {
976+
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
977+
}
978+
979+
fn partition_value_wrap(val: ScalarValue) -> ScalarValue {
980+
ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
981+
}
865982
}

0 commit comments

Comments
 (0)