Skip to content

Commit 3ece7a7

Browse files
authored
Fix parquet statistics for ListingTable and Utf8View with schema_force_string_view, rename config option to schema_force_view_types (#12232)
* chore: move schema_force_string_view upwards to be listed with other reading props * refactor(12123): have file schema be merged on view types with table schema * test(12123): test for with, and without schema_force_string_view * test(12123): demonstrate current upstream failure when reading page stats * chore(12123): update config.md * chore: cleanup * chore(12123): temporarily remove test until next arrow release * chore(12123): rename all variables to force_view_types * refactor(12123): make interface ParquetFormat::with_force_view_types public * chore(12123): rename helper method which coerces the schema (not merging fields) * chore(12123): add dosc to ParquetFormat to clarify exactly how the view types are used * test(12123): cleanup tests to be more explicit with ForceViews enum * test(12123): update tests to pass now that latest arrow-rs release is in * fix: use proper naming on benchmark
1 parent 8d2b240 commit 3ece7a7

File tree

21 files changed

+318
-95
lines changed

21 files changed

+318
-95
lines changed

benchmarks/src/clickbench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl RunOpt {
121121
.options_mut()
122122
.execution
123123
.parquet
124-
.schema_force_string_view = self.common.string_view;
124+
.schema_force_view_types = self.common.force_view_types;
125125

126126
let ctx = SessionContext::new_with_config(config);
127127
self.register_hits(&ctx).await?;

benchmarks/src/tpch/run.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl RunOpt {
124124
.options_mut()
125125
.execution
126126
.parquet
127-
.schema_force_string_view = self.common.string_view;
127+
.schema_force_view_types = self.common.force_view_types;
128128
let ctx = SessionContext::new_with_config(config);
129129

130130
// register tables
@@ -345,7 +345,7 @@ mod tests {
345345
partitions: Some(2),
346346
batch_size: 8192,
347347
debug: false,
348-
string_view: false,
348+
force_view_types: false,
349349
};
350350
let opt = RunOpt {
351351
query: Some(query),
@@ -379,7 +379,7 @@ mod tests {
379379
partitions: Some(2),
380380
batch_size: 8192,
381381
debug: false,
382-
string_view: false,
382+
force_view_types: false,
383383
};
384384
let opt = RunOpt {
385385
query: Some(query),

benchmarks/src/util/options.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub struct CommonOpt {
4141
/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
4242
/// when reading ParquetFiles
4343
#[structopt(long)]
44-
pub string_view: bool,
44+
pub force_view_types: bool,
4545
}
4646

4747
impl CommonOpt {

datafusion/common/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,10 @@ config_namespace! {
380380
/// the filters are applied in the same order as written in the query
381381
pub reorder_filters: bool, default = false
382382

383+
/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
384+
/// and `Binary/BinaryLarge` with `BinaryView`.
385+
pub schema_force_view_types: bool, default = false
386+
383387
// The following options affect writing to parquet files
384388
// and map to parquet::file::properties::WriterProperties
385389

@@ -483,10 +487,6 @@ config_namespace! {
483487
/// writing out already in-memory data, such as from a cached
484488
/// data frame.
485489
pub maximum_buffered_record_batches_per_stream: usize, default = 2
486-
487-
/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
488-
/// and `Binary/BinaryLarge` with `BinaryView`.
489-
pub schema_force_string_view: bool, default = false
490490
}
491491
}
492492

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ impl ParquetOptions {
175175
maximum_parallel_row_group_writers: _,
176176
maximum_buffered_record_batches_per_stream: _,
177177
bloom_filter_on_read: _, // reads not used for writer props
178-
schema_force_string_view: _,
178+
schema_force_view_types: _,
179179
} = self;
180180

181181
let mut builder = WriterProperties::builder()
@@ -441,7 +441,7 @@ mod tests {
441441
maximum_buffered_record_batches_per_stream: defaults
442442
.maximum_buffered_record_batches_per_stream,
443443
bloom_filter_on_read: defaults.bloom_filter_on_read,
444-
schema_force_string_view: defaults.schema_force_string_view,
444+
schema_force_view_types: defaults.schema_force_view_types,
445445
}
446446
}
447447

@@ -542,8 +542,7 @@ mod tests {
542542
maximum_buffered_record_batches_per_stream: global_options_defaults
543543
.maximum_buffered_record_batches_per_stream,
544544
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
545-
schema_force_string_view: global_options_defaults
546-
.schema_force_string_view,
545+
schema_force_view_types: global_options_defaults.schema_force_view_types,
547546
},
548547
column_specific_options,
549548
key_value_metadata,

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,51 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema {
229229
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
230230
}
231231

232+
/// Coerces the file schema if the table schema uses a view type.
233+
pub(crate) fn coerce_file_schema_to_view_type(
234+
table_schema: &Schema,
235+
file_schema: &Schema,
236+
) -> Option<Schema> {
237+
let mut transform = false;
238+
let table_fields: HashMap<_, _> = table_schema
239+
.fields
240+
.iter()
241+
.map(|f| {
242+
let dt = f.data_type();
243+
if dt.equals_datatype(&DataType::Utf8View) {
244+
transform = true;
245+
}
246+
(f.name(), dt)
247+
})
248+
.collect();
249+
if !transform {
250+
return None;
251+
}
252+
253+
let transformed_fields: Vec<Arc<Field>> = file_schema
254+
.fields
255+
.iter()
256+
.map(
257+
|field| match (table_fields.get(field.name()), field.data_type()) {
258+
(Some(DataType::Utf8View), DataType::Utf8)
259+
| (Some(DataType::Utf8View), DataType::LargeUtf8) => Arc::new(
260+
Field::new(field.name(), DataType::Utf8View, field.is_nullable()),
261+
),
262+
(Some(DataType::BinaryView), DataType::Binary)
263+
| (Some(DataType::BinaryView), DataType::LargeBinary) => Arc::new(
264+
Field::new(field.name(), DataType::BinaryView, field.is_nullable()),
265+
),
266+
_ => field.clone(),
267+
},
268+
)
269+
.collect();
270+
271+
Some(Schema::new_with_metadata(
272+
transformed_fields,
273+
file_schema.metadata.clone(),
274+
))
275+
}
276+
232277
#[cfg(test)]
233278
pub(crate) mod test_util {
234279
use std::ops::Range;

0 commit comments

Comments
 (0)