Skip to content

Allow DynamicFileCatalog support to query partitioned file #12683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions datafusion/core/src/datasource/dynamic_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ impl UrlTableFactory for DynamicListTableFactory {
.ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;

match ListingTableConfig::new(table_url.clone())
.infer(state)
.infer_options(state)
.await
{
Ok(cfg) => ListingTable::try_new(cfg)
.map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>)),
Ok(cfg) => {
let cfg = cfg
.infer_partitions_from_path(state)
.await?
.infer_schema(state)
.await?;
ListingTable::try_new(cfg)
.map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
}
Err(_) => Ok(None),
}
}
Expand Down
36 changes: 34 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::datasource::{
};
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -192,6 +192,38 @@ impl ListingTableConfig {
pub async fn infer(self, state: &SessionState) -> Result<Self> {
self.infer_options(state).await?.infer_schema(state).await
}

/// Infer the partition columns from the path. Requires `self.options` to be set prior to using.
pub async fn infer_partitions_from_path(self, state: &SessionState) -> Result<Self> {
match self.options {
Some(options) => {
let Some(url) = self.table_paths.first() else {
return config_err!("No table path found");
};
let partitions = options
.infer_partitions(state, url)
.await?
.into_iter()
.map(|col_name| {
(
col_name,
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
)
})
.collect::<Vec<_>>();
let options = options.with_table_partition_cols(partitions);
Ok(Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(options),
})
}
None => config_err!("No `ListingOptions` set for inferring schema"),
}
}
}

/// Options for creating a [`ListingTable`]
Expand Down Expand Up @@ -505,7 +537,7 @@ impl ListingOptions {
/// Infer the partitioning at the given path on the provided object store.
/// For performance reasons, it doesn't read all the files on disk
/// and therefore may fail to detect invalid partitioning.
async fn infer_partitions(
pub(crate) async fn infer_partitions(
&self,
state: &SessionState,
table_path: &ListingTableUrl,
Expand Down
167 changes: 164 additions & 3 deletions datafusion/sqllogictest/test_files/dynamic_file.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,170 @@ SELECT * FROM '../core/tests/data/partitioned_table_arrow/part=123' ORDER BY f0;
1 foo true
2 bar false

# dynamic file query doesn't support partitioned table
statement error DataFusion error: Error during planning: table 'datafusion.public.../core/tests/data/partitioned_table_arrow' not found
SELECT * FROM '../core/tests/data/partitioned_table_arrow' ORDER BY f0;
Comment on lines -28 to -30
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's what I mentioned in #12671 (comment)
I think it isn't a dynamic file catalog issue, so I removed it from the test.

# Read partitioned file
statement ok
CREATE TABLE src_table_1 (
int_col INT,
string_col TEXT,
bigint_col BIGINT,
partition_col INT
) AS VALUES
(1, 'aaa', 100, 1),
(2, 'bbb', 200, 1),
(3, 'ccc', 300, 1),
(4, 'ddd', 400, 1);

statement ok
CREATE TABLE src_table_2 (
int_col INT,
string_col TEXT,
bigint_col BIGINT,
partition_col INT
) AS VALUES
(5, 'eee', 500, 2),
(6, 'fff', 600, 2),
(7, 'ggg', 700, 2),
(8, 'hhh', 800, 2);

# Read partitioned csv file

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/csv_partitions'
STORED AS CSV
PARTITIONED BY (partition_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/csv_partitions'
STORED AS CSV
PARTITIONED BY (partition_col);
----
4

query ITIT rowsort
SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/csv_partitions';
----
1 aaa 100 1
2 bbb 200 1
3 ccc 300 1
4 ddd 400 1
5 eee 500 2
6 fff 600 2
7 ggg 700 2
8 hhh 800 2

# Read partitioned json file

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/json_partitions'
STORED AS JSON
PARTITIONED BY (partition_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/json_partitions'
STORED AS JSON
PARTITIONED BY (partition_col);
----
4

query ITIT rowsort
SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/json_partitions';
----
1 aaa 100 1
2 bbb 200 1
3 ccc 300 1
4 ddd 400 1
5 eee 500 2
6 fff 600 2
7 ggg 700 2
8 hhh 800 2

# Read partitioned arrow file

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/arrow_partitions'
STORED AS ARROW
PARTITIONED BY (partition_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/arrow_partitions'
STORED AS ARROW
PARTITIONED BY (partition_col);
----
4

query ITIT rowsort
SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/arrow_partitions';
----
1 aaa 100 1
2 bbb 200 1
3 ccc 300 1
4 ddd 400 1
5 eee 500 2
6 fff 600 2
7 ggg 700 2
8 hhh 800 2

# Read partitioned parquet file

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/parquet_partitions'
STORED AS PARQUET
PARTITIONED BY (partition_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/parquet_partitions'
STORED AS PARQUET
PARTITIONED BY (partition_col);
----
4

query ITIT rowsort
select * from 'test_files/scratch/dynamic_file/parquet_partitions';
----
1 aaa 100 1
2 bbb 200 1
3 ccc 300 1
4 ddd 400 1
5 eee 500 2
6 fff 600 2
7 ggg 700 2
8 hhh 800 2

# Read partitioned parquet file with multiple partition columns

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/nested_partition'
STORED AS PARQUET
PARTITIONED BY (partition_col, string_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/nested_partition'
STORED AS PARQUET
PARTITIONED BY (partition_col, string_col);
----
4

query IITT rowsort
select * from 'test_files/scratch/dynamic_file/nested_partition';
----
1 100 1 aaa
2 200 1 bbb
3 300 1 ccc
4 400 1 ddd
5 500 2 eee
6 600 2 fff
7 700 2 ggg
8 800 2 hhh

# read avro file
query IT
Expand Down