Skip to content

Commit 470b3a8

Browse files
goldmedalalamb
authored andcommitted
introduce binary_as_string parquet option
1 parent 128dd14 commit 470b3a8

File tree

16 files changed

+511
-174
lines changed

16 files changed

+511
-174
lines changed

benchmarks/src/clickbench.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,15 @@ impl RunOpt {
115115
None => queries.min_query_id()..=queries.max_query_id(),
116116
};
117117

118+
// configure parquet options
118119
let mut config = self.common.config();
119-
config
120-
.options_mut()
121-
.execution
122-
.parquet
123-
.schema_force_view_types = self.common.force_view_types;
120+
{
121+
let parquet_options = &mut config.options_mut().execution.parquet;
122+
parquet_options.schema_force_view_types = self.common.force_view_types;
123+
// The hits_partitioned dataset specifies string columns
124+
// as binary due to how it was written. Force it to strings
125+
parquet_options.binary_as_string = true;
126+
}
124127

125128
let ctx = SessionContext::new_with_config(config);
126129
self.register_hits(&ctx).await?;
@@ -148,7 +151,7 @@ impl RunOpt {
148151
Ok(())
149152
}
150153

151-
/// Registrs the `hits.parquet` as a table named `hits`
154+
/// Registers the `hits.parquet` as a table named `hits`
152155
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
153156
let options = Default::default();
154157
let path = self.path.as_os_str().to_str().unwrap();

datafusion/common/src/config.rs

+8
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,14 @@ config_namespace! {
390390
/// and `Binary/BinaryLarge` with `BinaryView`.
391391
pub schema_force_view_types: bool, default = false
392392

393+
/// (reading) If true, parquet reader will read columns of
394+
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
395+
///
396+
/// Parquet files generated by some legacy writers do not correctly set
397+
/// the UTF8 flag for strings, causing string columns to be loaded as
398+
/// BLOB instead.
399+
pub binary_as_string: bool, default = false
400+
393401
// The following options affect writing to parquet files
394402
// and map to parquet::file::properties::WriterProperties
395403

datafusion/common/src/file_options/parquet_writer.rs

+3
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ impl ParquetOptions {
176176
maximum_buffered_record_batches_per_stream: _,
177177
bloom_filter_on_read: _, // reads not used for writer props
178178
schema_force_view_types: _,
179+
binary_as_string: _, // not used for writer props
179180
} = self;
180181

181182
let mut builder = WriterProperties::builder()
@@ -442,6 +443,7 @@ mod tests {
442443
.maximum_buffered_record_batches_per_stream,
443444
bloom_filter_on_read: defaults.bloom_filter_on_read,
444445
schema_force_view_types: defaults.schema_force_view_types,
446+
binary_as_string: defaults.binary_as_string,
445447
}
446448
}
447449

@@ -543,6 +545,7 @@ mod tests {
543545
.maximum_buffered_record_batches_per_stream,
544546
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
545547
schema_force_view_types: global_options_defaults.schema_force_view_types,
548+
binary_as_string: global_options_defaults.binary_as_string,
546549
},
547550
column_specific_options,
548551
key_value_metadata,

datafusion/core/src/datasource/file_format/mod.rs

+95-17
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::error::Result;
4242
use crate::execution::context::SessionState;
4343
use crate::physical_plan::{ExecutionPlan, Statistics};
4444

45-
use arrow_schema::{DataType, Field, Schema};
45+
use arrow_schema::{DataType, Field, FieldRef, Schema};
4646
use datafusion_common::file_options::file_type::FileType;
4747
use datafusion_common::{internal_err, not_impl_err, GetExt};
4848
use datafusion_expr::Expr;
@@ -235,20 +235,26 @@ pub fn file_type_to_format(
235235
}
236236
}
237237

238+
/// Create a new field with the specified data type, copying the other
239+
/// properties from the input field
240+
fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
241+
Arc::new(field.as_ref().clone().with_data_type(new_type))
242+
}
243+
238244
/// Transform a schema to use view types for Utf8 and Binary
245+
///
246+
/// See [parquet::ParquetFormat::force_view_types] for details
239247
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
240248
let transformed_fields: Vec<Arc<Field>> = schema
241249
.fields
242250
.iter()
243251
.map(|field| match field.data_type() {
244-
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
245-
Field::new(field.name(), DataType::Utf8View, field.is_nullable())
246-
.with_metadata(field.metadata().to_owned()),
247-
),
248-
DataType::Binary | DataType::LargeBinary => Arc::new(
249-
Field::new(field.name(), DataType::BinaryView, field.is_nullable())
250-
.with_metadata(field.metadata().to_owned()),
251-
),
252+
DataType::Utf8 | DataType::LargeUtf8 => {
253+
field_with_new_type(field, DataType::Utf8View)
254+
}
255+
DataType::Binary | DataType::LargeBinary => {
256+
field_with_new_type(field, DataType::BinaryView)
257+
}
252258
_ => field.clone(),
253259
})
254260
.collect();
@@ -274,6 +280,7 @@ pub(crate) fn coerce_file_schema_to_view_type(
274280
(f.name(), dt)
275281
})
276282
.collect();
283+
277284
if !transform {
278285
return None;
279286
}
@@ -283,14 +290,13 @@ pub(crate) fn coerce_file_schema_to_view_type(
283290
.iter()
284291
.map(
285292
|field| match (table_fields.get(field.name()), field.data_type()) {
286-
(Some(DataType::Utf8View), DataType::Utf8)
287-
| (Some(DataType::Utf8View), DataType::LargeUtf8) => Arc::new(
288-
Field::new(field.name(), DataType::Utf8View, field.is_nullable()),
289-
),
290-
(Some(DataType::BinaryView), DataType::Binary)
291-
| (Some(DataType::BinaryView), DataType::LargeBinary) => Arc::new(
292-
Field::new(field.name(), DataType::BinaryView, field.is_nullable()),
293-
),
293+
(Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
294+
field_with_new_type(field, DataType::Utf8View)
295+
}
296+
(
297+
Some(DataType::BinaryView),
298+
DataType::Binary | DataType::LargeBinary,
299+
) => field_with_new_type(field, DataType::BinaryView),
294300
_ => field.clone(),
295301
},
296302
)
@@ -302,6 +308,78 @@ pub(crate) fn coerce_file_schema_to_view_type(
302308
))
303309
}
304310

311+
/// Transform a schema so that any binary types are strings
312+
pub fn transform_binary_to_string(schema: &Schema) -> Schema {
313+
let transformed_fields: Vec<Arc<Field>> = schema
314+
.fields
315+
.iter()
316+
.map(|field| match field.data_type() {
317+
DataType::Binary => field_with_new_type(field, DataType::Utf8),
318+
DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
319+
DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
320+
_ => field.clone(),
321+
})
322+
.collect();
323+
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
324+
}
325+
326+
/// If the table schema uses a string type, coerce the file schema to use a string type.
327+
///
328+
/// See [parquet::ParquetFormat::binary_as_string] for details
329+
pub(crate) fn coerce_file_schema_to_string_type(
330+
table_schema: &Schema,
331+
file_schema: &Schema,
332+
) -> Option<Schema> {
333+
let mut transform = false;
334+
let table_fields: HashMap<_, _> = table_schema
335+
.fields
336+
.iter()
337+
.map(|f| (f.name(), f.data_type()))
338+
.collect();
339+
let transformed_fields: Vec<Arc<Field>> = file_schema
340+
.fields
341+
.iter()
342+
.map(
343+
|field| match (table_fields.get(field.name()), field.data_type()) {
344+
// table schema uses string type, coerce the file schema to use string type
345+
(
346+
Some(DataType::Utf8),
347+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
348+
) => {
349+
transform = true;
350+
field_with_new_type(field, DataType::Utf8)
351+
}
352+
// table schema uses large string type, coerce the file schema to use large string type
353+
(
354+
Some(DataType::LargeUtf8),
355+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
356+
) => {
357+
transform = true;
358+
field_with_new_type(field, DataType::LargeUtf8)
359+
}
360+
// table schema uses string view type, coerce the file schema to use view type
361+
(
362+
Some(DataType::Utf8View),
363+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
364+
) => {
365+
transform = true;
366+
field_with_new_type(field, DataType::Utf8View)
367+
}
368+
_ => field.clone(),
369+
},
370+
)
371+
.collect();
372+
373+
if !transform {
374+
None
375+
} else {
376+
Some(Schema::new_with_metadata(
377+
transformed_fields,
378+
file_schema.metadata.clone(),
379+
))
380+
}
381+
}
382+
305383
#[cfg(test)]
306384
pub(crate) mod test_util {
307385
use std::ops::Range;

datafusion/core/src/datasource/file_format/parquet.rs

+32-5
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use std::sync::Arc;
2626
use super::write::demux::start_demuxer_task;
2727
use super::write::{create_writer, SharedBuffer};
2828
use super::{
29-
coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
30-
FileFormatFactory, FilePushdownSupport, FileScanConfig,
29+
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
30+
transform_binary_to_string, transform_schema_to_view, FileFormat, FileFormatFactory,
31+
FilePushdownSupport, FileScanConfig,
3132
};
3233
use crate::arrow::array::RecordBatch;
3334
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
@@ -253,13 +254,29 @@ impl ParquetFormat {
253254
self.options.global.schema_force_view_types
254255
}
255256

256-
/// If true, will use view types (StringView and BinaryView).
257-
///
258-
/// Refer to [`Self::force_view_types`].
257+
/// If true, will use view types. See [`Self::force_view_types`] for details
259258
pub fn with_force_view_types(mut self, use_views: bool) -> Self {
260259
self.options.global.schema_force_view_types = use_views;
261260
self
262261
}
262+
263+
/// Return `true` if binary types will be read as strings.
264+
///
265+
/// If this returns true, DataFusion will instruct the parquet reader
266+
/// to read binary columns such as `Binary` or `BinaryView` as the
267+
/// corresponding string type such as `Utf8` or `LargeUtf8`.
268+
/// The parquet reader has special optimizations for `Utf8` and `LargeUtf8`
269+
/// validation, and such queries are significantly faster than reading
270+
/// binary columns and then casting to string columns.
271+
pub fn binary_as_string(&self) -> bool {
272+
self.options.global.binary_as_string
273+
}
274+
275+
/// If true, will read binary types as strings. See [`Self::binary_as_string`] for details
276+
pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
277+
self.options.global.binary_as_string = binary_as_string;
278+
self
279+
}
263280
}
264281

265282
/// Clears all metadata (Schema level and field level) on an iterator
@@ -350,6 +367,12 @@ impl FileFormat for ParquetFormat {
350367
Schema::try_merge(schemas)
351368
}?;
352369

370+
let schema = if self.binary_as_string() {
371+
transform_binary_to_string(&schema)
372+
} else {
373+
schema
374+
};
375+
353376
let schema = if self.force_view_types() {
354377
transform_schema_to_view(&schema)
355378
} else {
@@ -552,6 +575,10 @@ pub fn statistics_from_parquet_meta_calc(
552575
file_metadata.schema_descr(),
553576
file_metadata.key_value_metadata(),
554577
)?;
578+
if let Some(merged) = coerce_file_schema_to_string_type(&table_schema, &file_schema) {
579+
file_schema = merged;
580+
}
581+
555582
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &file_schema) {
556583
file_schema = merged;
557584
}

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

+16-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
//! [`ParquetOpener`] for opening Parquet files
1919
20-
use crate::datasource::file_format::coerce_file_schema_to_view_type;
20+
use crate::datasource::file_format::{
21+
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
22+
};
2123
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
2224
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
2325
use crate::datasource::physical_plan::parquet::{
@@ -80,7 +82,7 @@ pub(super) struct ParquetOpener {
8082
}
8183

8284
impl FileOpener for ParquetOpener {
83-
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> {
85+
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
8486
let file_range = file_meta.range.clone();
8587
let extensions = file_meta.extensions.clone();
8688
let file_name = file_meta.location().to_string();
@@ -121,7 +123,14 @@ impl FileOpener for ParquetOpener {
121123
let mut metadata_timer = file_metrics.metadata_load_time.timer();
122124
let metadata =
123125
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
124-
let mut schema = metadata.schema().clone();
126+
let mut schema = Arc::clone(metadata.schema());
127+
128+
if let Some(merged) =
129+
coerce_file_schema_to_string_type(&table_schema, &schema)
130+
{
131+
schema = Arc::new(merged);
132+
}
133+
125134
// read with view types
126135
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
127136
{
@@ -130,16 +139,16 @@ impl FileOpener for ParquetOpener {
130139

131140
let options = ArrowReaderOptions::new()
132141
.with_page_index(enable_page_index)
133-
.with_schema(schema.clone());
142+
.with_schema(Arc::clone(&schema));
134143
let metadata =
135-
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;
144+
ArrowReaderMetadata::try_new(Arc::clone(metadata.metadata()), options)?;
136145

137146
metadata_timer.stop();
138147

139148
let mut builder =
140149
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
141150

142-
let file_schema = builder.schema().clone();
151+
let file_schema = Arc::clone(builder.schema());
143152

144153
let (schema_mapping, adapted_projections) =
145154
schema_adapter.map_schema(&file_schema)?;
@@ -177,7 +186,7 @@ impl FileOpener for ParquetOpener {
177186

178187
// Determine which row groups to actually read. The idea is to skip
179188
// as many row groups as possible based on the metadata and query
180-
let file_metadata = builder.metadata().clone();
189+
let file_metadata = Arc::clone(builder.metadata());
181190
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
182191
let rg_metadata = file_metadata.row_groups();
183192
// track which row groups to actually read

datafusion/proto-common/proto/datafusion_common.proto

+1
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ message ParquetOptions {
494494
bool bloom_filter_on_read = 26; // default = true
495495
bool bloom_filter_on_write = 27; // default = false
496496
bool schema_force_view_types = 28; // default = false
497+
bool binary_as_string = 29; // default = false
497498

498499
oneof metadata_size_hint_opt {
499500
uint64 metadata_size_hint = 4;

0 commit comments

Comments
 (0)