Skip to content

Commit 4034040

Browse files
committed
feat(parquet): coerce_types flag for date64
1 parent b711f23 commit 4034040

File tree

8 files changed

+303
-55
lines changed

8 files changed

+303
-55
lines changed

parquet/src/arrow/array_reader/primitive_array.rs

Lines changed: 111 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,8 @@ where
208208
// As there is not always a 1:1 mapping between Arrow and Parquet, there
209209
// are datatypes which we must convert explicitly.
210210
// These are:
211-
// - date64: we should cast int32 to date32, then date32 to date64.
212-
// - decimal: cast in32 to decimal, int64 to decimal
211+
// - decimal: cast int32 to decimal, int64 to decimal
213212
let array = match target_type {
214-
ArrowType::Date64 => {
215-
// this is cheap as it internally reinterprets the data
216-
let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
217-
arrow_cast::cast(&a, target_type)?
218-
}
219213
ArrowType::Decimal128(p, s) => {
220214
// Apply conversion to all elements regardless of null slots as the conversion
221215
// to `i128` is infallible. This improves performance by avoiding a branch in
@@ -305,9 +299,9 @@ mod tests {
305299
use crate::util::test_common::rand_gen::make_pages;
306300
use crate::util::InMemoryPageIterator;
307301
use arrow::datatypes::ArrowPrimitiveType;
308-
use arrow_array::{Array, PrimitiveArray};
302+
use arrow_array::{Array, Date32Array, Date64Array, PrimitiveArray};
309303

310-
use arrow::datatypes::DataType::Decimal128;
304+
use arrow::datatypes::DataType::{Date32, Date64, Decimal128};
311305
use rand::distributions::uniform::SampleUniform;
312306
use std::collections::VecDeque;
313307

@@ -545,6 +539,14 @@ mod tests {
545539
arrow::datatypes::Int32Type,
546540
i32
547541
);
542+
test_primitive_array_reader_one_type!(
543+
crate::data_type::Int64Type,
544+
PhysicalType::INT64,
545+
"DATE",
546+
arrow::datatypes::Date64Type,
547+
arrow::datatypes::Int64Type,
548+
i64
549+
);
548550
test_primitive_array_reader_one_type!(
549551
crate::data_type::Int32Type,
550552
PhysicalType::INT32,
@@ -783,4 +785,104 @@ mod tests {
783785
assert_ne!(array, &data_decimal_array)
784786
}
785787
}
788+
789+
#[test]
790+
fn test_primitive_array_reader_date32_type() {
791+
// parquet `INT32` to date
792+
let message_type = "
793+
message test_schema {
794+
REQUIRED INT32 date1 (DATE);
795+
}
796+
";
797+
let schema = parse_message_type(message_type)
798+
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
799+
.unwrap();
800+
let column_desc = schema.column(0);
801+
802+
// create the array reader
803+
{
804+
let mut data = Vec::new();
805+
let mut page_lists = Vec::new();
806+
make_column_chunks::<Int32Type>(
807+
column_desc.clone(),
808+
Encoding::PLAIN,
809+
100,
810+
-99999999,
811+
99999999,
812+
&mut Vec::new(),
813+
&mut Vec::new(),
814+
&mut data,
815+
&mut page_lists,
816+
true,
817+
2,
818+
);
819+
let page_iterator = InMemoryPageIterator::new(page_lists);
820+
821+
let mut array_reader =
822+
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
823+
.unwrap();
824+
825+
// read data from the reader
826+
// the data type is date
827+
let array = array_reader.next_batch(50).unwrap();
828+
assert_eq!(array.data_type(), &Date32);
829+
let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
830+
let data_date_array = data[0..50]
831+
.iter()
832+
.copied()
833+
.map(Some)
834+
.collect::<Date32Array>();
835+
assert_eq!(array, &data_date_array);
836+
}
837+
}
838+
839+
#[test]
840+
fn test_primitive_array_reader_date64_type() {
841+
// parquet `INT64` to date
842+
let message_type = "
843+
message test_schema {
844+
REQUIRED INT64 date1 (DATE);
845+
}
846+
";
847+
let schema = parse_message_type(message_type)
848+
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
849+
.unwrap();
850+
let column_desc = schema.column(0);
851+
852+
// create the array reader
853+
{
854+
let mut data = Vec::new();
855+
let mut page_lists = Vec::new();
856+
make_column_chunks::<Int64Type>(
857+
column_desc.clone(),
858+
Encoding::PLAIN,
859+
100,
860+
-999999999999999999,
861+
999999999999999999,
862+
&mut Vec::new(),
863+
&mut Vec::new(),
864+
&mut data,
865+
&mut page_lists,
866+
true,
867+
2,
868+
);
869+
let page_iterator = InMemoryPageIterator::new(page_lists);
870+
871+
let mut array_reader =
872+
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
873+
.unwrap();
874+
875+
// read data from the reader
876+
// the data type is date
877+
let array = array_reader.next_batch(50).unwrap();
878+
assert_eq!(array.data_type(), &Date64);
879+
let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
880+
let data_date_array = data[0..50]
881+
.iter()
882+
.copied()
883+
.map(Some)
884+
.collect::<Date64Array>();
885+
assert_eq!(array, &data_date_array);
886+
}
887+
}
786888
}

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -948,8 +948,8 @@ mod tests {
948948
use arrow_array::builder::*;
949949
use arrow_array::cast::AsArray;
950950
use arrow_array::types::{
951-
Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
952-
Time32MillisecondType, Time64MicrosecondType,
951+
Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
952+
Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
953953
};
954954
use arrow_array::*;
955955
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
@@ -1288,6 +1288,117 @@ mod tests {
12881288
Ok(())
12891289
}
12901290

1291+
#[test]
1292+
fn test_date32_roundtrip() -> Result<()> {
1293+
use arrow_array::Date32Array;
1294+
1295+
let schema = Arc::new(Schema::new(vec![Field::new(
1296+
"date32",
1297+
ArrowDataType::Date32,
1298+
false,
1299+
)]));
1300+
1301+
let mut buf = Vec::with_capacity(1024);
1302+
1303+
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1304+
1305+
let original = RecordBatch::try_new(
1306+
schema,
1307+
vec![Arc::new(Date32Array::from(vec![
1308+
-1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1309+
]))],
1310+
)?;
1311+
1312+
writer.write(&original)?;
1313+
writer.close()?;
1314+
1315+
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1316+
let ret = reader.next().unwrap()?;
1317+
assert_eq!(ret, original);
1318+
1319+
// Ensure can be downcast to the correct type
1320+
ret.column(0).as_primitive::<Date32Type>();
1321+
1322+
Ok(())
1323+
}
1324+
1325+
#[test]
1326+
fn test_date64_roundtrip() -> Result<()> {
1327+
use arrow_array::Date64Array;
1328+
1329+
let schema = Arc::new(Schema::new(vec![
1330+
Field::new("small-date64", ArrowDataType::Date64, false),
1331+
Field::new("big-date64", ArrowDataType::Date64, false),
1332+
Field::new("invalid-date64", ArrowDataType::Date64, false),
1333+
]));
1334+
1335+
let mut default_buf = Vec::with_capacity(1024);
1336+
let mut coerce_buf = Vec::with_capacity(1024);
1337+
1338+
let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1339+
1340+
let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1341+
let mut coerce_writer =
1342+
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1343+
1344+
static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1345+
1346+
let original = RecordBatch::try_new(
1347+
schema,
1348+
vec![
1349+
// small-date64
1350+
Arc::new(Date64Array::from(vec![
1351+
-1_000_000 * NUM_MILLISECONDS_IN_DAY,
1352+
-1_000 * NUM_MILLISECONDS_IN_DAY,
1353+
0,
1354+
1_000 * NUM_MILLISECONDS_IN_DAY,
1355+
1_000_000 * NUM_MILLISECONDS_IN_DAY,
1356+
])),
1357+
// big-date64
1358+
Arc::new(Date64Array::from(vec![
1359+
-10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1360+
-1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1361+
0,
1362+
1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1363+
10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1364+
])),
1365+
// invalid-date64
1366+
Arc::new(Date64Array::from(vec![
1367+
-1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1368+
-1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1369+
1,
1370+
1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1371+
1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1372+
])),
1373+
],
1374+
)?;
1375+
1376+
default_writer.write(&original)?;
1377+
coerce_writer.write(&original)?;
1378+
1379+
default_writer.close()?;
1380+
coerce_writer.close()?;
1381+
1382+
let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1383+
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1384+
1385+
let default_ret = default_reader.next().unwrap()?;
1386+
let coerce_ret = coerce_reader.next().unwrap()?;
1387+
1388+
// Roundtrip should be successful when default writer used
1389+
assert_eq!(default_ret, original);
1390+
1391+
// Only small-date64 should roundtrip successfully when coerce_types writer is used
1392+
assert_eq!(coerce_ret.column(0), original.column(0));
1393+
assert_ne!(coerce_ret.column(1), original.column(1));
1394+
assert_ne!(coerce_ret.column(2), original.column(2));
1395+
1396+
// Ensure both can be downcast to the correct type
1397+
default_ret.column(0).as_primitive::<Date64Type>();
1398+
coerce_ret.column(0).as_primitive::<Date64Type>();
1399+
1400+
Ok(())
1401+
}
12911402
struct RandFixedLenGen {}
12921403

12931404
impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {

parquet/src/arrow/arrow_reader/statistics.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -371,8 +371,7 @@ macro_rules! get_statistics {
371371
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
372372
))),
373373
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
374-
[<$stat_type_prefix Int32StatsIterator>]::new($iterator)
375-
.map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
374+
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
376375
))),
377376
DataType::Timestamp(unit, timezone) =>{
378377
let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
@@ -945,19 +944,7 @@ macro_rules! get_data_page_statistics {
945944
})
946945
},
947946
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
948-
DataType::Date64 => Ok(
949-
Arc::new(
950-
Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
951-
.map(|x| {
952-
x.into_iter()
953-
.map(|x| {
954-
x.and_then(|x| i64::try_from(x).ok())
955-
})
956-
.map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
957-
}).flatten()
958-
)
959-
)
960-
),
947+
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
961948
DataType::Decimal128(precision, scale) => Ok(Arc::new(
962949
Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
963950
DataType::Decimal256(precision, scale) => Ok(Arc::new(

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,11 @@ impl<W: Write + Send> ArrowWriter<W> {
180180
arrow_schema: SchemaRef,
181181
options: ArrowWriterOptions,
182182
) -> Result<Self> {
183+
let mut props = options.properties;
183184
let schema = match options.schema_root {
184-
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
185-
None => arrow_to_parquet_schema(&arrow_schema)?,
185+
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
186+
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
186187
};
187-
let mut props = options.properties;
188188
if !options.skip_arrow_metadata {
189189
// add serialized arrow schema
190190
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
@@ -549,8 +549,8 @@ impl ArrowColumnChunk {
549549
/// ]));
550550
///
551551
/// // Compute the parquet schema
552-
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
553552
/// let props = Arc::new(WriterProperties::default());
553+
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap();
554554
///
555555
/// // Create writers for each of the leaf columns
556556
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
@@ -858,6 +858,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
858858
}
859859
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
860860
match column.data_type() {
861+
ArrowDataType::Date64 => {
862+
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
863+
864+
let array = array.as_primitive::<Int64Type>();
865+
write_primitive(typed, array.values(), levels)
866+
}
861867
ArrowDataType::Int64 => {
862868
let array = column.as_primitive::<Int64Type>();
863869
write_primitive(typed, array.values(), levels)

0 commit comments

Comments
 (0)