Skip to content

Commit 05c519e

Browse files
committed
Improve error messages if schema hint mismatches with parquet scheam
1 parent 3426c8a commit 05c519e

File tree

3 files changed

+129
-31
lines changed

3 files changed

+129
-31
lines changed

arrow-array/src/record_batch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,8 +1073,8 @@ mod tests {
10731073

10741074
let a = Int64Array::from(vec![1, 2, 3, 4, 5]);
10751075

1076-
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]);
1077-
assert!(batch.is_err());
1076+
let err = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap_err();
1077+
assert_eq!(err.to_string(), "Invalid argument error: column types must match schema types, expected Int32 but found Int64 at column index 0");
10781078
}
10791079

10801080
#[test]

parquet/src/arrow/arrow_reader/filter.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use crate::arrow::ProjectionMask;
1919
use arrow_array::{BooleanArray, RecordBatch};
2020
use arrow_schema::ArrowError;
21+
use std::fmt::{Debug, Formatter};
2122

2223
/// A predicate operating on [`RecordBatch`]
2324
///
@@ -108,11 +109,18 @@ where
108109
/// not contiguous.
109110
///
110111
/// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection
112+
111113
pub struct RowFilter {
112114
/// A list of [`ArrowPredicate`]
113115
pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
114116
}
115117

118+
impl Debug for RowFilter {
119+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120+
write!(f, "RowFilter {{ {} predicates: }}", self.predicates.len())
121+
}
122+
}
123+
116124
impl RowFilter {
117125
/// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
118126
pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 119 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use arrow_select::filter::prep_null_mask_filter;
2525
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
2626
pub use selection::{RowSelection, RowSelector};
2727
use std::collections::VecDeque;
28+
use std::fmt::{Debug, Formatter};
2829
use std::sync::Arc;
2930

3031
pub use crate::arrow::array_reader::RowGroups;
@@ -112,6 +113,24 @@ pub struct ArrowReaderBuilder<T> {
112113
pub(crate) offset: Option<usize>,
113114
}
114115

116+
impl<T: Debug> Debug for ArrowReaderBuilder<T> {
117+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118+
f.debug_struct("ArrowReaderBuilder<T>")
119+
.field("input", &self.input)
120+
.field("metadata", &self.metadata)
121+
.field("schema", &self.schema)
122+
.field("fields", &self.fields)
123+
.field("batch_size", &self.batch_size)
124+
.field("row_groups", &self.row_groups)
125+
.field("projection", &self.projection)
126+
.field("filter", &self.filter)
127+
.field("selection", &self.selection)
128+
.field("limit", &self.limit)
129+
.field("offset", &self.offset)
130+
.finish()
131+
}
132+
}
133+
115134
impl<T> ArrowReaderBuilder<T> {
116135
pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
117136
Self {
@@ -518,37 +537,47 @@ impl ArrowReaderMetadata {
518537
// parquet_to_arrow_field_levels is expected to throw an error if the schemas have
519538
// different lengths, but we check here to be safe.
520539
if inferred_len != supplied_len {
521-
Err(arrow_err!(format!(
522-
"incompatible arrow schema, expected {} columns received {}",
540+
return Err(arrow_err!(format!(
541+
"Incompatible supplied Arrow schema: expected {} columns received {}",
523542
inferred_len, supplied_len
524-
)))
525-
} else {
526-
let diff_fields: Vec<_> = supplied_schema
527-
.fields()
528-
.iter()
529-
.zip(fields.iter())
530-
.filter_map(|(field1, field2)| {
531-
if field1 != field2 {
532-
Some(field1.name().clone())
533-
} else {
534-
None
535-
}
536-
})
537-
.collect();
543+
)));
544+
}
538545

539-
if !diff_fields.is_empty() {
540-
Err(ParquetError::ArrowError(format!(
541-
"incompatible arrow schema, the following fields could not be cast: [{}]",
542-
diff_fields.join(", ")
543-
)))
544-
} else {
545-
Ok(Self {
546-
metadata,
547-
schema: supplied_schema,
548-
fields: field_levels.levels.map(Arc::new),
549-
})
546+
let mut errors = Vec::new();
547+
548+
let field_iter = supplied_schema.fields().iter().zip(fields.iter());
549+
550+
for (field1, field2) in field_iter {
551+
if field1.data_type() != field2.data_type() {
552+
errors.push(format!(
553+
"data type mismatch for field {}: requested {:?} but found {:?}",
554+
field1.name(),
555+
field1.data_type(),
556+
field2.data_type()
557+
));
558+
}
559+
if field1.is_nullable() != field2.is_nullable() {
560+
errors.push(format!(
561+
"nullability mismatch for field {}: expected {:?} but found {:?}",
562+
field1.name(),
563+
field1.is_nullable(),
564+
field2.is_nullable()
565+
));
550566
}
551567
}
568+
569+
if !errors.is_empty() {
570+
let message = errors.join(", ");
571+
return Err(ParquetError::ArrowError(format!(
572+
"Incompatible supplied Arrow schema: {message}",
573+
)));
574+
}
575+
576+
Ok(Self {
577+
metadata,
578+
schema: supplied_schema,
579+
fields: field_levels.levels.map(Arc::new),
580+
})
552581
}
553582

554583
/// Returns a reference to the [`ParquetMetaData`] for this parquet file
@@ -571,6 +600,12 @@ impl ArrowReaderMetadata {
571600
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
572601
pub struct SyncReader<T: ChunkReader>(T);
573602

603+
impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
604+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
605+
f.debug_tuple("SyncReader").field(&self.0).finish()
606+
}
607+
}
608+
574609
/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
575610
///
576611
/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
@@ -3474,7 +3509,7 @@ mod tests {
34743509
Field::new("col2_valid", ArrowDataType::Int32, false),
34753510
Field::new("col3_invalid", ArrowDataType::Int32, false),
34763511
])),
3477-
"Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3512+
"Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
34783513
);
34793514
}
34803515

@@ -3512,10 +3547,65 @@ mod tests {
35123547
false,
35133548
),
35143549
])),
3515-
"Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3550+
"Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
3551+
requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
3552+
but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
35163553
);
35173554
}
35183555

3556+
/// Return parquet data with a single column of utf8 strings
3557+
fn utf8_parquet() -> Bytes {
3558+
let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
3559+
let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
3560+
let props = None;
3561+
// write parquet file with non nullable strings
3562+
let mut parquet_data = vec![];
3563+
let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
3564+
writer.write(&batch).unwrap();
3565+
writer.close().unwrap();
3566+
Bytes::from(parquet_data)
3567+
}
3568+
3569+
#[test]
3570+
fn test_schema_error_bad_types() {
3571+
// verify incompatible schemas error on read
3572+
let parquet_data = utf8_parquet();
3573+
3574+
// Ask to read it back with an incompatible schema (int vs string)
3575+
let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3576+
"column1",
3577+
arrow::datatypes::DataType::Int32,
3578+
false,
3579+
)]));
3580+
3581+
// read it back out
3582+
let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3583+
let err =
3584+
ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3585+
.unwrap_err();
3586+
assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
3587+
}
3588+
3589+
#[test]
3590+
fn test_schema_error_bad_nullability() {
3591+
// verify incompatible schemas error on read
3592+
let parquet_data = utf8_parquet();
3593+
3594+
// Ask to read it back with an incompatible schema (nullability mismatch)
3595+
let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
3596+
"column1",
3597+
arrow::datatypes::DataType::Utf8,
3598+
true,
3599+
)]));
3600+
3601+
// read it back out
3602+
let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
3603+
let err =
3604+
ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
3605+
.unwrap_err();
3606+
assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
3607+
}
3608+
35193609
#[test]
35203610
fn test_read_binary_as_utf8() {
35213611
let file = write_parquet_from_iter(vec![

0 commit comments

Comments
 (0)