Skip to content

Commit 8b5dbd7

Browse files
crepererumalamb
andauthored
Support arbitrary user defined partition column in ListingTable (rather than assuming they are always Dictionary encoded) (#5545)
* refactor: user may choose to dict-encode partition values Let the user decide if they may want to encode partition values for file-based data sources. Dictionary encoding makes sense for string values but is probably pointless or even counterproductive for integer types. * refactor: improve transition * fix: remove leftover dbg * refactor: `partition_{type,value}_wrap` -> `wrap_partition_{type,value}_in_dict` --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 37138a5 commit 8b5dbd7

File tree

9 files changed

+329
-84
lines changed

9 files changed

+329
-84
lines changed

datafusion/core/src/datasource/listing/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,16 @@ pub struct FileRange {
5555
pub struct PartitionedFile {
5656
/// Path for the file (e.g. URL, filesystem path, etc)
5757
pub object_meta: ObjectMeta,
58-
/// Values of partition columns to be appended to each row
58+
/// Values of partition columns to be appended to each row.
59+
///
60+
/// These MUST have the same count, order, and type than the [`table_partition_cols`].
61+
///
62+
/// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
63+
///
64+
///
65+
/// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
66+
/// [`wrap_partition_value_in_dict`]: crate::physical_plan::file_format::wrap_partition_value_in_dict
67+
/// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
5968
pub partition_values: Vec<ScalarValue>,
6069
/// An optional file range for a more fine-grained parallel execution
6170
pub range: Option<FileRange>,

datafusion/core/src/datasource/listing/table.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ use crate::datasource::{
4444
};
4545
use crate::logical_expr::TableProviderFilterPushDown;
4646
use crate::physical_plan;
47-
use crate::physical_plan::file_format::partition_type_wrap;
4847
use crate::{
4948
error::{DataFusionError, Result},
5049
execution::context::SessionState,
@@ -300,6 +299,8 @@ impl ListingOptions {
300299

301300
/// Set table partition column names on [`ListingOptions`] and returns self.
302301
///
302+
/// You may use [`wrap_partition_type_in_dict`] to request a dictionary-encoded type.
303+
///
303304
/// ```
304305
/// # use std::sync::Arc;
305306
/// # use arrow::datatypes::DataType;
@@ -315,6 +316,9 @@ impl ListingOptions {
315316
/// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
316317
/// ("col_b".to_string(), DataType::Utf8)]);
317318
/// ```
319+
///
320+
///
321+
/// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
318322
pub fn with_table_partition_cols(
319323
mut self,
320324
table_partition_cols: Vec<(String, DataType)>,
@@ -538,11 +542,7 @@ impl ListingTable {
538542
// Add the partition columns to the file schema
539543
let mut table_fields = file_schema.fields().clone();
540544
for (part_col_name, part_col_type) in &options.table_partition_cols {
541-
table_fields.push(Field::new(
542-
part_col_name,
543-
partition_type_wrap(part_col_type.clone()),
544-
false,
545-
));
545+
table_fields.push(Field::new(part_col_name, part_col_type.clone(), false));
546546
}
547547
let infinite_source = options.infinite_source;
548548

@@ -1012,10 +1012,7 @@ mod tests {
10121012

10131013
let opt = ListingOptions::new(Arc::new(AvroFormat {}))
10141014
.with_file_extension(FileType::AVRO.get_ext())
1015-
.with_table_partition_cols(vec![(
1016-
String::from("p1"),
1017-
partition_type_wrap(DataType::Utf8),
1018-
)])
1015+
.with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
10191016
.with_target_partitions(4);
10201017

10211018
let table_path = ListingTableUrl::parse("test:///table/").unwrap();

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,15 @@ impl TableProviderFactory for ListingTableFactory {
8686
None,
8787
cmd.table_partition_cols
8888
.iter()
89-
.map(|x| (x.clone(), DataType::Utf8))
89+
.map(|x| {
90+
(
91+
x.clone(),
92+
DataType::Dictionary(
93+
Box::new(DataType::UInt16),
94+
Box::new(DataType::Utf8),
95+
),
96+
)
97+
})
9098
.collect::<Vec<_>>(),
9199
)
92100
} else {

datafusion/core/src/physical_plan/file_format/avro.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ mod tests {
213213
use crate::datasource::listing::PartitionedFile;
214214
use crate::datasource::object_store::ObjectStoreUrl;
215215
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
216-
use crate::physical_plan::file_format::partition_type_wrap;
217216
use crate::prelude::SessionContext;
218217
use crate::scalar::ScalarValue;
219218
use crate::test::object_store::local_unpartitioned_file;
@@ -409,10 +408,7 @@ mod tests {
409408
file_schema,
410409
statistics: Statistics::default(),
411410
limit: None,
412-
table_partition_cols: vec![(
413-
"date".to_owned(),
414-
partition_type_wrap(DataType::Utf8),
415-
)],
411+
table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
416412
output_ordering: None,
417413
infinite_source: false,
418414
});

datafusion/core/src/physical_plan/file_format/csv.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ mod tests {
326326
use super::*;
327327
use crate::datasource::file_format::file_type::FileType;
328328
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
329-
use crate::physical_plan::file_format::partition_type_wrap;
330329
use crate::prelude::*;
331330
use crate::test::{partitioned_csv_config, partitioned_file_groups};
332331
use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
@@ -580,8 +579,7 @@ mod tests {
580579
let mut config = partitioned_csv_config(file_schema, file_groups)?;
581580

582581
// Add partition columns
583-
config.table_partition_cols =
584-
vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))];
582+
config.table_partition_cols = vec![("date".to_owned(), DataType::Utf8)];
585583
config.file_groups[0][0].partition_values =
586584
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
587585

0 commit comments

Comments
 (0)