Skip to content

Commit a29f2be

Browse files
Validate partitions columns in CREATE EXTERNAL TABLE if table already exists. (#9912)
* prevent panic * initial version, bad code * some error handling * Some slt tests * docs and minor refactors * cleaning up * fix tests * clear err message for single-file partitioned tables * typo * test invalid/mixed partitions on disk * ensure order in error msg for testing
1 parent 27bdf3c commit a29f2be

File tree

5 files changed

+212
-4
lines changed

5 files changed

+212
-4
lines changed

datafusion/core/src/datasource/listing/table.rs

+107
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use datafusion_physical_expr::{
6161

6262
use async_trait::async_trait;
6363
use futures::{future, stream, StreamExt, TryStreamExt};
64+
use itertools::Itertools;
6465
use object_store::ObjectStore;
6566

6667
/// Configuration for creating a [`ListingTable`]
@@ -438,6 +439,112 @@ impl ListingOptions {
438439

439440
self.format.infer_schema(state, &store, &files).await
440441
}
442+
443+
/// Infers the partition columns stored in `LOCATION` and compares
444+
/// them with the columns provided in `PARTITIONED BY` to help prevent
445+
/// accidental corrupts of partitioned tables.
446+
///
447+
/// Allows specifying partial partitions.
448+
pub async fn validate_partitions(
449+
&self,
450+
state: &SessionState,
451+
table_path: &ListingTableUrl,
452+
) -> Result<()> {
453+
if self.table_partition_cols.is_empty() {
454+
return Ok(());
455+
}
456+
457+
if !table_path.is_collection() {
458+
return plan_err!(
459+
"Can't create a partitioned table backed by a single file, \
460+
perhaps the URL is missing a trailing slash?"
461+
);
462+
}
463+
464+
let inferred = self.infer_partitions(state, table_path).await?;
465+
466+
// no partitioned files found on disk
467+
if inferred.is_empty() {
468+
return Ok(());
469+
}
470+
471+
let table_partition_names = self
472+
.table_partition_cols
473+
.iter()
474+
.map(|(col_name, _)| col_name.clone())
475+
.collect_vec();
476+
477+
if inferred.len() < table_partition_names.len() {
478+
return plan_err!(
479+
"Inferred partitions to be {:?}, but got {:?}",
480+
inferred,
481+
table_partition_names
482+
);
483+
}
484+
485+
// match prefix to allow creating tables with partial partitions
486+
for (idx, col) in table_partition_names.iter().enumerate() {
487+
if &inferred[idx] != col {
488+
return plan_err!(
489+
"Inferred partitions to be {:?}, but got {:?}",
490+
inferred,
491+
table_partition_names
492+
);
493+
}
494+
}
495+
496+
Ok(())
497+
}
498+
499+
/// Infer the partitioning at the given path on the provided object store.
500+
/// For performance reasons, it doesn't read all the files on disk
501+
/// and therefore may fail to detect invalid partitioning.
502+
async fn infer_partitions(
503+
&self,
504+
state: &SessionState,
505+
table_path: &ListingTableUrl,
506+
) -> Result<Vec<String>> {
507+
let store = state.runtime_env().object_store(table_path)?;
508+
509+
// only use 10 files for inference
510+
// This can fail to detect inconsistent partition keys
511+
// A DFS traversal approach of the store can help here
512+
let files: Vec<_> = table_path
513+
.list_all_files(state, store.as_ref(), &self.file_extension)
514+
.await?
515+
.take(10)
516+
.try_collect()
517+
.await?;
518+
519+
let stripped_path_parts = files.iter().map(|file| {
520+
table_path
521+
.strip_prefix(&file.location)
522+
.unwrap()
523+
.collect_vec()
524+
});
525+
526+
let partition_keys = stripped_path_parts
527+
.map(|path_parts| {
528+
path_parts
529+
.into_iter()
530+
.rev()
531+
.skip(1) // get parents only; skip the file itself
532+
.rev()
533+
.map(|s| s.split('=').take(1).collect())
534+
.collect_vec()
535+
})
536+
.collect_vec();
537+
538+
match partition_keys.into_iter().all_equal_value() {
539+
Ok(v) => Ok(v),
540+
Err(None) => Ok(vec![]),
541+
Err(Some(diff)) => {
542+
let mut sorted_diff = [diff.0, diff.1];
543+
sorted_diff.sort();
544+
plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
545+
}
546+
}
547+
}
441548
}
442549

443550
/// Reads data from one or more files via an

datafusion/core/src/datasource/listing_table_factory.rs

+2
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ impl TableProviderFactory for ListingTableFactory {
137137
.with_table_partition_cols(table_partition_cols)
138138
.with_file_sort_order(cmd.order_exprs.clone());
139139

140+
options.validate_partitions(state, &table_path).await?;
141+
140142
let resolved_schema = match provided_schema {
141143
None => options.infer_schema(state, &table_path).await?,
142144
Some(s) => s,

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type};
3232
use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
3333
use arrow_schema::{DataType, Field, Schema, SchemaRef};
3434
use datafusion_common::stats::Precision;
35-
use datafusion_common::{exec_err, ColumnStatistics, Statistics};
35+
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics};
3636
use datafusion_physical_expr::LexOrdering;
3737

3838
use log::warn;
@@ -256,9 +256,17 @@ impl PartitionColumnProjector {
256256
file_batch.columns().len()
257257
);
258258
}
259+
259260
let mut cols = file_batch.columns().to_vec();
260261
for &(pidx, sidx) in &self.projected_partition_indexes {
261-
let mut partition_value = Cow::Borrowed(&partition_values[pidx]);
262+
let p_value =
263+
partition_values
264+
.get(pidx)
265+
.ok_or(DataFusionError::Execution(
266+
"Invalid partitioning found on disk".to_string(),
267+
))?;
268+
269+
let mut partition_value = Cow::Borrowed(p_value);
262270

263271
// check if user forgot to dict-encode the partition value
264272
let field = self.projected_schema.field(sidx);

datafusion/core/src/execution/context/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1113,7 +1113,7 @@ impl SessionContext {
11131113
table_ref: impl Into<TableReference>,
11141114
provider: Arc<dyn TableProvider>,
11151115
) -> Result<Option<Arc<dyn TableProvider>>> {
1116-
let table_ref = table_ref.into();
1116+
let table_ref: TableReference = table_ref.into();
11171117
let table = table_ref.table().to_owned();
11181118
self.state
11191119
.read()

datafusion/sqllogictest/test_files/create_external_table.slt

+92-1
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,95 @@ statement error DataFusion error: Invalid or Unsupported Configuration: Config v
113113
CREATE EXTERNAL TABLE csv_table (column1 int)
114114
STORED AS CSV
115115
LOCATION 'foo.csv'
116-
OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
116+
OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
117+
118+
# Partitioned table on a single file
119+
query error DataFusion error: Error during planning: Can't create a partitioned table backed by a single file, perhaps the URL is missing a trailing slash\?
120+
CREATE EXTERNAL TABLE single_file_partition(c1 int)
121+
PARTITIONED BY (p2 string, p1 string)
122+
STORED AS CSV
123+
LOCATION 'foo.csv';
124+
125+
# Wrong partition order error
126+
127+
statement ok
128+
CREATE EXTERNAL TABLE partitioned (c1 int)
129+
PARTITIONED BY (p1 string, p2 string)
130+
STORED AS parquet
131+
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
132+
133+
query ITT
134+
INSERT INTO partitioned VALUES (1, 'x', 'y');
135+
----
136+
1
137+
138+
query error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2", "p1"\]
139+
CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int)
140+
PARTITIONED BY (p2 string, p1 string)
141+
STORED AS parquet
142+
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
143+
144+
statement error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2"\]
145+
CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int)
146+
PARTITIONED BY (p2 string)
147+
STORED AS parquet
148+
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
149+
150+
# But allows partial partition selection
151+
152+
statement ok
153+
CREATE EXTERNAL TABLE partial_partitioned (c1 int)
154+
PARTITIONED BY (p1 string)
155+
STORED AS parquet
156+
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
157+
158+
query IT
159+
SELECT * FROM partial_partitioned;
160+
----
161+
1 x
162+
163+
statement ok
164+
CREATE EXTERNAL TABLE inner_partition (c1 int)
165+
PARTITIONED BY (p2 string)
166+
STORED AS parquet
167+
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/p1=x/';
168+
169+
query IT
170+
SELECT * FROM inner_partition;
171+
----
172+
1 y
173+
174+
# Simulate manual creation of invalid (mixed) partitions on disk
175+
176+
statement ok
177+
CREATE EXTERNAL TABLE test(name string)
178+
PARTITIONED BY (year string, month string)
179+
STORED AS parquet
180+
LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';
181+
182+
statement ok
183+
-- passes the partition check since the previous statement didn't write to disk
184+
CREATE EXTERNAL TABLE test2(name string)
185+
PARTITIONED BY (month string, year string)
186+
STORED AS parquet
187+
LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';
188+
189+
query TTT
190+
-- creates year -> month partitions
191+
INSERT INTO test VALUES('name', '2024', '03');
192+
----
193+
1
194+
195+
query TTT
196+
-- creates month -> year partitions.
197+
-- now table have both partitions (year -> month and month -> year)
198+
INSERT INTO test2 VALUES('name', '2024', '03');
199+
----
200+
1
201+
202+
statement error DataFusion error: Error during planning: Found mixed partition values on disk \[\["month", "year"\], \["year", "month"\]\]
203+
-- fails to infer as partitions are not consistent
204+
CREATE EXTERNAL TABLE test3(name string)
205+
PARTITIONED BY (month string, year string)
206+
STORED AS parquet
207+
LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';

0 commit comments

Comments
 (0)