diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ae8395aef6a4..1c20fa7caa14 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,7 +19,7 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 -use arrow::{array::ArrayRef, datatypes::DataType}; +use arrow::{array::ArrayRef, datatypes::DataType, datatypes::TimeUnit}; use arrow_array::{new_empty_array, new_null_array, UInt64Array}; use arrow_schema::{Field, FieldRef, Schema}; use datafusion_common::{ @@ -112,6 +112,26 @@ macro_rules! get_statistic { Some(DataType::UInt64) => { Some(ScalarValue::UInt64(Some((*s.$func()) as u64))) } + Some(DataType::Timestamp(unit, timezone)) => { + Some(match unit { + TimeUnit::Second => ScalarValue::TimestampSecond( + Some(*s.$func()), + timezone.clone(), + ), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond( + Some(*s.$func()), + timezone.clone(), + ), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond( + Some(*s.$func()), + timezone.clone(), + ), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond( + Some(*s.$func()), + timezone.clone(), + ), + }) + } _ => Some(ScalarValue::Int64(Some(*s.$func()))), } } @@ -395,7 +415,8 @@ mod test { use arrow_array::{ new_null_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, - Int8Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, + Int8Array, RecordBatch, StringArray, StructArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; use arrow_schema::{Field, SchemaRef}; use bytes::Bytes; @@ -536,28 +557,209 @@ mod test { } #[test] - #[should_panic( - expected = "Inconsistent types in ScalarValue::iter_to_array. Expected Int64, got TimestampNanosecond(NULL, None)" - )] - // Due to https://github.com/apache/datafusion/issues/8295 fn roundtrip_timestamp() { Test { - input: timestamp_array([ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), + input: timestamp_seconds_array( + [ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ], None, - // row group 3 + ), + expected_min: timestamp_seconds_array([Some(1), Some(5), None], None), + expected_max: timestamp_seconds_array([Some(3), Some(9), None], None), + } + .run(); + + Test { + input: timestamp_milliseconds_array( + [ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ], None, + ), + expected_min: timestamp_milliseconds_array([Some(1), Some(5), None], None), + expected_max: timestamp_milliseconds_array([Some(3), Some(9), None], None), + } + .run(); + + Test { + input: timestamp_microseconds_array( + [ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ], None, + ), + expected_min: timestamp_microseconds_array([Some(1), Some(5), None], None), + expected_max: timestamp_microseconds_array([Some(3), Some(9), None], None), + } + .run(); + + Test { + input: timestamp_nanoseconds_array( + [ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ], None, - ]), - expected_min: timestamp_array([Some(1), Some(5), None]), - expected_max: timestamp_array([Some(3), Some(9), None]), + ), + expected_min: timestamp_nanoseconds_array([Some(1), Some(5), None], None), + expected_max: timestamp_nanoseconds_array([Some(3), Some(9), None], None), + } + .run() + } + + #[test] + fn roundtrip_timestamp_timezoned() { + Test { + input: timestamp_seconds_array( + [ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ], + Some("UTC"), + ), + expected_min: timestamp_seconds_array([Some(1), Some(5), None], Some("UTC")), + expected_max: timestamp_seconds_array([Some(3), Some(9), None], Some("UTC")), + } + .run(); + + Test { + input: timestamp_milliseconds_array( + [ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ], + Some("UTC"), + ), + expected_min: timestamp_milliseconds_array( + [Some(1), Some(5), None], + Some("UTC"), + ), + expected_max: timestamp_milliseconds_array( + [Some(3), Some(9), None], + Some("UTC"), + ), + } + .run(); + + Test { + input: timestamp_microseconds_array( + [ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ], + Some("UTC"), + ), + expected_min: timestamp_microseconds_array( + [Some(1), Some(5), None], + Some("UTC"), + ), + expected_max: timestamp_microseconds_array( + [Some(3), Some(9), None], + Some("UTC"), + ), + } + .run(); + + Test { + input: timestamp_nanoseconds_array( + [ + // row group 1 + Some(1), + None, + Some(3), + // row group 2 + Some(9), + Some(5), + None, + // row group 3 + None, + None, + None, + ], + Some("UTC"), + ), + expected_min: timestamp_nanoseconds_array( + [Some(1), Some(5), None], + Some("UTC"), + ), + expected_max: timestamp_nanoseconds_array( + [Some(3), Some(9), None], + Some("UTC"), + ), } .run() } @@ -914,8 +1116,8 @@ mod test { // File has no min/max for timestamp_col .with_column(ExpectedColumn { name: "timestamp_col", - expected_min: timestamp_array([None]), - expected_max: timestamp_array([None]), + expected_min: timestamp_nanoseconds_array([None], None), + expected_max: timestamp_nanoseconds_array([None], None), }) .with_column(ExpectedColumn { name: "year", @@ -1135,9 +1337,48 @@ mod test { Arc::new(array) } - fn timestamp_array(input: impl IntoIterator>) -> ArrayRef { + fn timestamp_seconds_array( + input: impl IntoIterator>, + timzezone: Option<&str>, + ) -> ArrayRef { + let array: TimestampSecondArray = input.into_iter().collect(); + match timzezone { + Some(tz) => Arc::new(array.with_timezone(tz)), + None => Arc::new(array), + } + } + + fn timestamp_milliseconds_array( + input: impl IntoIterator>, + timzezone: Option<&str>, + ) -> ArrayRef { + let array: TimestampMillisecondArray = input.into_iter().collect(); + match timzezone { + Some(tz) => Arc::new(array.with_timezone(tz)), + None => Arc::new(array), + } + } + + fn timestamp_microseconds_array( + input: impl IntoIterator>, + timzezone: Option<&str>, + ) -> ArrayRef { + let array: TimestampMicrosecondArray = input.into_iter().collect(); + match timzezone { + Some(tz) => Arc::new(array.with_timezone(tz)), + None => Arc::new(array), + } + } + + fn timestamp_nanoseconds_array( + input: impl IntoIterator>, + timzezone: Option<&str>, + ) -> ArrayRef { let array: TimestampNanosecondArray = input.into_iter().collect(); - Arc::new(array) + match timzezone { + Some(tz) => Arc::new(array.with_timezone(tz)), + None => Arc::new(array), + } } fn utf8_array<'a>(input: impl IntoIterator>) -> ArrayRef { diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index eebf3447cbe9..2836cd2893f3 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -22,12 +22,16 @@ use std::fs::File; use std::sync::Arc; use arrow::compute::kernels::cast_utils::Parser; -use arrow::datatypes::{Date32Type, Date64Type}; +use arrow::datatypes::{ + Date32Type, Date64Type, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, +}; use arrow_array::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, + Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ @@ -456,36 +460,40 @@ async fn test_int_8() { // timestamp #[tokio::test] async fn test_timestamp() { - // This creates a parquet files of 5 columns named "nanos", "micros", "millis", "seconds", "names" + // This creates a parquet files of 9 columns named "nanos", "nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", "seconds", "seconds_timezoned", "names" // "nanos" --> TimestampNanosecondArray + // "nanos_timezoned" --> TimestampNanosecondArray // "micros" --> TimestampMicrosecondArray + // "micros_timezoned" --> TimestampMicrosecondArray // "millis" --> TimestampMillisecondArray + // "millis_timezoned" --> TimestampMillisecondArray // "seconds" --> TimestampSecondArray + // "seconds_timezoned" --> TimestampSecondArray // "names" --> StringArray // // The file is created by 4 record batches, each has 5 rowws. // Since the row group isze is set to 5, those 4 batches will go into 4 row groups - // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + // This creates a parquet files of 4 columns named "nanos", "nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", "seconds", "seconds_timezoned" let reader = TestReader { scenario: Scenario::Timestamps, row_per_group: 5, }; + let tz = "Pacific/Efate"; + Test { reader: reader.build().await, - // mins are [1577840461000000000, 1577840471000000000, 1577841061000000000, 1578704461000000000,] - expected_min: Arc::new(Int64Array::from(vec![ - 1577840461000000000, - 1577840471000000000, - 1577841061000000000, - 1578704461000000000, + expected_min: Arc::new(TimestampNanosecondArray::from(vec![ + TimestampNanosecondType::parse("2020-01-01T01:01:01"), + TimestampNanosecondType::parse("2020-01-01T01:01:11"), + TimestampNanosecondType::parse("2020-01-01T01:11:01"), + TimestampNanosecondType::parse("2020-01-11T01:01:01"), ])), - // maxes are [1577926861000000000, 1577926871000000000, 1577927461000000000, 1578790861000000000,] - expected_max: Arc::new(Int64Array::from(vec![ - 1577926861000000000, - 1577926871000000000, - 1577927461000000000, - 1578790861000000000, + expected_max: Arc::new(TimestampNanosecondArray::from(vec![ + TimestampNanosecondType::parse("2020-01-02T01:01:01"), + TimestampNanosecondType::parse("2020-01-02T01:01:11"), + TimestampNanosecondType::parse("2020-01-02T01:11:01"), + TimestampNanosecondType::parse("2020-01-12T01:01:01"), ])), // nulls are [1, 1, 1, 1] expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), @@ -495,21 +503,48 @@ async fn test_timestamp() { } .run(); - // micros + Test { + reader: reader.build().await, + expected_min: Arc::new( + TimestampNanosecondArray::from(vec![ + TimestampNanosecondType::parse("2020-01-01T01:01:01"), + TimestampNanosecondType::parse("2020-01-01T01:01:11"), + TimestampNanosecondType::parse("2020-01-01T01:11:01"), + TimestampNanosecondType::parse("2020-01-11T01:01:01"), + ]) + .with_timezone(tz), + ), + expected_max: Arc::new( + TimestampNanosecondArray::from(vec![ + TimestampNanosecondType::parse("2020-01-02T01:01:01"), + TimestampNanosecondType::parse("2020-01-02T01:01:11"), + TimestampNanosecondType::parse("2020-01-02T01:11:01"), + TimestampNanosecondType::parse("2020-01-12T01:01:01"), + ]) + .with_timezone(tz), + ), + // nulls are [1, 1, 1, 1] + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "nanos_timezoned", + } + .run(); + // micros Test { reader: reader.build().await, - expected_min: Arc::new(Int64Array::from(vec![ - 1577840461000000, - 1577840471000000, - 1577841061000000, - 1578704461000000, + expected_min: Arc::new(TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2020-01-01T01:01:01"), + TimestampMicrosecondType::parse("2020-01-01T01:01:11"), + TimestampMicrosecondType::parse("2020-01-01T01:11:01"), + TimestampMicrosecondType::parse("2020-01-11T01:01:01"), ])), - expected_max: Arc::new(Int64Array::from(vec![ - 1577926861000000, - 1577926871000000, - 1577927461000000, - 1578790861000000, + expected_max: Arc::new(TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2020-01-02T01:01:01"), + TimestampMicrosecondType::parse("2020-01-02T01:01:11"), + TimestampMicrosecondType::parse("2020-01-02T01:11:01"), + TimestampMicrosecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), @@ -517,20 +552,48 @@ async fn test_timestamp() { } .run(); + Test { + reader: reader.build().await, + expected_min: Arc::new( + TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2020-01-01T01:01:01"), + TimestampMicrosecondType::parse("2020-01-01T01:01:11"), + TimestampMicrosecondType::parse("2020-01-01T01:11:01"), + TimestampMicrosecondType::parse("2020-01-11T01:01:01"), + ]) + .with_timezone(tz), + ), + expected_max: Arc::new( + TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2020-01-02T01:01:01"), + TimestampMicrosecondType::parse("2020-01-02T01:01:11"), + TimestampMicrosecondType::parse("2020-01-02T01:11:01"), + TimestampMicrosecondType::parse("2020-01-12T01:01:01"), + ]) + .with_timezone(tz), + ), + // nulls are [1, 1, 1, 1] + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "micros_timezoned", + } + .run(); + // millis Test { reader: reader.build().await, - expected_min: Arc::new(Int64Array::from(vec![ - 1577840461000, - 1577840471000, - 1577841061000, - 1578704461000, + expected_min: Arc::new(TimestampMillisecondArray::from(vec![ + TimestampMillisecondType::parse("2020-01-01T01:01:01"), + TimestampMillisecondType::parse("2020-01-01T01:01:11"), + TimestampMillisecondType::parse("2020-01-01T01:11:01"), + TimestampMillisecondType::parse("2020-01-11T01:01:01"), ])), - expected_max: Arc::new(Int64Array::from(vec![ - 1577926861000, - 1577926871000, - 1577927461000, - 1578790861000, + expected_max: Arc::new(TimestampMillisecondArray::from(vec![ + TimestampMillisecondType::parse("2020-01-02T01:01:01"), + TimestampMillisecondType::parse("2020-01-02T01:01:11"), + TimestampMillisecondType::parse("2020-01-02T01:11:01"), + TimestampMillisecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), @@ -538,30 +601,96 @@ async fn test_timestamp() { } .run(); + Test { + reader: reader.build().await, + expected_min: Arc::new( + TimestampMillisecondArray::from(vec![ + TimestampMillisecondType::parse("2020-01-01T01:01:01"), + TimestampMillisecondType::parse("2020-01-01T01:01:11"), + TimestampMillisecondType::parse("2020-01-01T01:11:01"), + TimestampMillisecondType::parse("2020-01-11T01:01:01"), + ]) + .with_timezone(tz), + ), + expected_max: Arc::new( + TimestampMillisecondArray::from(vec![ + TimestampMillisecondType::parse("2020-01-02T01:01:01"), + TimestampMillisecondType::parse("2020-01-02T01:01:11"), + TimestampMillisecondType::parse("2020-01-02T01:11:01"), + TimestampMillisecondType::parse("2020-01-12T01:01:01"), + ]) + .with_timezone(tz), + ), + // nulls are [1, 1, 1, 1] + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "millis_timezoned", + } + .run(); + // seconds Test { reader: reader.build().await, - expected_min: Arc::new(Int64Array::from(vec![ - 1577840461, 1577840471, 1577841061, 1578704461, + expected_min: Arc::new(TimestampSecondArray::from(vec![ + TimestampSecondType::parse("2020-01-01T01:01:01"), + TimestampSecondType::parse("2020-01-01T01:01:11"), + TimestampSecondType::parse("2020-01-01T01:11:01"), + TimestampSecondType::parse("2020-01-11T01:01:01"), ])), - expected_max: Arc::new(Int64Array::from(vec![ - 1577926861, 1577926871, 1577927461, 1578790861, + expected_max: Arc::new(TimestampSecondArray::from(vec![ + TimestampSecondType::parse("2020-01-02T01:01:01"), + TimestampSecondType::parse("2020-01-02T01:01:11"), + TimestampSecondType::parse("2020-01-02T01:11:01"), + TimestampSecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "seconds", } .run(); + + Test { + reader: reader.build().await, + expected_min: Arc::new( + TimestampSecondArray::from(vec![ + TimestampSecondType::parse("2020-01-01T01:01:01"), + TimestampSecondType::parse("2020-01-01T01:01:11"), + TimestampSecondType::parse("2020-01-01T01:11:01"), + TimestampSecondType::parse("2020-01-11T01:01:01"), + ]) + .with_timezone(tz), + ), + expected_max: Arc::new( + TimestampSecondArray::from(vec![ + TimestampSecondType::parse("2020-01-02T01:01:01"), + TimestampSecondType::parse("2020-01-02T01:01:11"), + TimestampSecondType::parse("2020-01-02T01:11:01"), + TimestampSecondType::parse("2020-01-12T01:01:01"), + ]) + .with_timezone(tz), + ), + // nulls are [1, 1, 1, 1] + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "seconds_timezoned", + } + .run(); } // timestamp with different row group sizes #[tokio::test] async fn test_timestamp_diff_rg_sizes() { - // This creates a parquet files of 5 columns named "nanos", "micros", "millis", "seconds", "names" + // This creates a parquet files of 9 columns named "nanos", "nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", "seconds", "seconds_timezoned", "names" // "nanos" --> TimestampNanosecondArray + // "nanos_timezoned" --> TimestampNanosecondArray // "micros" --> TimestampMicrosecondArray + // "micros_timezoned" --> TimestampMicrosecondArray // "millis" --> TimestampMillisecondArray + // "millis_timezoned" --> TimestampMillisecondArray // "seconds" --> TimestampSecondArray + // "seconds_timezoned" --> TimestampSecondArray // "names" --> StringArray // // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 3 row groups with size 8, 8, 4 @@ -570,19 +699,19 @@ async fn test_timestamp_diff_rg_sizes() { row_per_group: 8, // note that the row group size is 8 }; + let tz = "Pacific/Efate"; + Test { reader: reader.build().await, - // mins are [1577840461000000000, 1577841061000000000, 1578704521000000000] - expected_min: Arc::new(Int64Array::from(vec![ - 1577840461000000000, - 1577841061000000000, - 1578704521000000000, + expected_min: Arc::new(TimestampNanosecondArray::from(vec![ + TimestampNanosecondType::parse("2020-01-01T01:01:01"), + TimestampNanosecondType::parse("2020-01-01T01:11:01"), + TimestampNanosecondType::parse("2020-01-11T01:02:01"), ])), - // maxes are [1577926861000000000, 1578704461000000000, 157879086100000000] - expected_max: Arc::new(Int64Array::from(vec![ - 1577926861000000000, - 1578704461000000000, - 1578790861000000000, + expected_max: Arc::new(TimestampNanosecondArray::from(vec![ + TimestampNanosecondType::parse("2020-01-02T01:01:01"), + TimestampNanosecondType::parse("2020-01-11T01:01:01"), + TimestampNanosecondType::parse("2020-01-12T01:01:01"), ])), // nulls are [1, 2, 1] expected_null_counts: UInt64Array::from(vec![1, 2, 1]), @@ -592,18 +721,44 @@ async fn test_timestamp_diff_rg_sizes() { } .run(); + Test { + reader: reader.build().await, + expected_min: Arc::new( + TimestampNanosecondArray::from(vec![ + TimestampNanosecondType::parse("2020-01-01T01:01:01"), + TimestampNanosecondType::parse("2020-01-01T01:11:01"), + TimestampNanosecondType::parse("2020-01-11T01:02:01"), + ]) + .with_timezone(tz), + ), + expected_max: Arc::new( + TimestampNanosecondArray::from(vec![ + TimestampNanosecondType::parse("2020-01-02T01:01:01"), + TimestampNanosecondType::parse("2020-01-11T01:01:01"), + TimestampNanosecondType::parse("2020-01-12T01:01:01"), + ]) + .with_timezone(tz), + ), + // nulls are [1, 2, 1] + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), + // row counts are [8, 8, 4] + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + column_name: "nanos_timezoned", + } + .run(); + // micros Test { reader: reader.build().await, - expected_min: Arc::new(Int64Array::from(vec![ - 1577840461000000, - 1577841061000000, - 1578704521000000, + expected_min: Arc::new(TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2020-01-01T01:01:01"), + TimestampMicrosecondType::parse("2020-01-01T01:11:01"), + TimestampMicrosecondType::parse("2020-01-11T01:02:01"), ])), - expected_max: Arc::new(Int64Array::from(vec![ - 1577926861000000, - 1578704461000000, - 1578790861000000, + expected_max: Arc::new(TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2020-01-02T01:01:01"), + TimestampMicrosecondType::parse("2020-01-11T01:01:01"), + TimestampMicrosecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), @@ -611,18 +766,44 @@ async fn test_timestamp_diff_rg_sizes() { } .run(); + Test { + reader: reader.build().await, + expected_min: Arc::new( + TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2020-01-01T01:01:01"), + TimestampMicrosecondType::parse("2020-01-01T01:11:01"), + TimestampMicrosecondType::parse("2020-01-11T01:02:01"), + ]) + .with_timezone(tz), + ), + expected_max: Arc::new( + TimestampMicrosecondArray::from(vec![ + TimestampMicrosecondType::parse("2020-01-02T01:01:01"), + TimestampMicrosecondType::parse("2020-01-11T01:01:01"), + TimestampMicrosecondType::parse("2020-01-12T01:01:01"), + ]) + .with_timezone(tz), + ), + // nulls are [1, 2, 1] + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), + // row counts are [8, 8, 4] + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + column_name: "micros_timezoned", + } + .run(); + // millis Test { reader: reader.build().await, - expected_min: Arc::new(Int64Array::from(vec![ - 1577840461000, - 1577841061000, - 1578704521000, + expected_min: Arc::new(TimestampMillisecondArray::from(vec![ + TimestampMillisecondType::parse("2020-01-01T01:01:01"), + TimestampMillisecondType::parse("2020-01-01T01:11:01"), + TimestampMillisecondType::parse("2020-01-11T01:02:01"), ])), - expected_max: Arc::new(Int64Array::from(vec![ - 1577926861000, - 1578704461000, - 1578790861000, + expected_max: Arc::new(TimestampMillisecondArray::from(vec![ + TimestampMillisecondType::parse("2020-01-02T01:01:01"), + TimestampMillisecondType::parse("2020-01-11T01:01:01"), + TimestampMillisecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), @@ -630,20 +811,76 @@ async fn test_timestamp_diff_rg_sizes() { } .run(); + Test { + reader: reader.build().await, + expected_min: Arc::new( + TimestampMillisecondArray::from(vec![ + TimestampMillisecondType::parse("2020-01-01T01:01:01"), + TimestampMillisecondType::parse("2020-01-01T01:11:01"), + TimestampMillisecondType::parse("2020-01-11T01:02:01"), + ]) + .with_timezone(tz), + ), + expected_max: Arc::new( + TimestampMillisecondArray::from(vec![ + TimestampMillisecondType::parse("2020-01-02T01:01:01"), + TimestampMillisecondType::parse("2020-01-11T01:01:01"), + TimestampMillisecondType::parse("2020-01-12T01:01:01"), + ]) + .with_timezone(tz), + ), + // nulls are [1, 2, 1] + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), + // row counts are [8, 8, 4] + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + column_name: "millis_timezoned", + } + .run(); + // seconds Test { reader: reader.build().await, - expected_min: Arc::new(Int64Array::from(vec![ - 1577840461, 1577841061, 1578704521, + expected_min: Arc::new(TimestampSecondArray::from(vec![ + TimestampSecondType::parse("2020-01-01T01:01:01"), + TimestampSecondType::parse("2020-01-01T01:11:01"), + TimestampSecondType::parse("2020-01-11T01:02:01"), ])), - expected_max: Arc::new(Int64Array::from(vec![ - 1577926861, 1578704461, 1578790861, + expected_max: Arc::new(TimestampSecondArray::from(vec![ + TimestampSecondType::parse("2020-01-02T01:01:01"), + TimestampSecondType::parse("2020-01-11T01:01:01"), + TimestampSecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "seconds", } .run(); + + Test { + reader: reader.build().await, + expected_min: Arc::new( + TimestampSecondArray::from(vec![ + TimestampSecondType::parse("2020-01-01T01:01:01"), + TimestampSecondType::parse("2020-01-01T01:11:01"), + TimestampSecondType::parse("2020-01-11T01:02:01"), + ]) + .with_timezone(tz), + ), + expected_max: Arc::new( + TimestampSecondArray::from(vec![ + TimestampSecondType::parse("2020-01-02T01:01:01"), + TimestampSecondType::parse("2020-01-11T01:01:01"), + TimestampSecondType::parse("2020-01-12T01:01:01"), + ]) + .with_timezone(tz), + ), + // nulls are [1, 2, 1] + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), + // row counts are [8, 8, 4] + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + column_name: "seconds_timezoned", + } + .run(); } // date with different row group sizes diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 94ae9ff601ec..41a0a86aa8d3 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -332,9 +332,13 @@ fn make_boolean_batch(v: Vec>) -> RecordBatch { /// /// Columns are named: /// "nanos" --> TimestampNanosecondArray +/// "nanos_timezoned" --> TimestampNanosecondArray with timezone /// "micros" --> TimestampMicrosecondArray +/// "micros_timezoned" --> TimestampMicrosecondArray with timezone /// "millis" --> TimestampMillisecondArray +/// "millis_timezoned" --> TimestampMillisecondArray with timezone /// "seconds" --> TimestampSecondArray +/// "seconds_timezoned" --> TimestampSecondArray with timezone /// "names" --> StringArray fn make_timestamp_batch(offset: Duration) -> RecordBatch { let ts_strings = vec![ @@ -345,6 +349,8 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch { Some("2020-01-02T01:01:01.0000000000001"), ]; + let tz_string = "Pacific/Efate"; + let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos"); let ts_nanos = ts_strings @@ -382,19 +388,47 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch { .map(|(i, _)| format!("Row {i} + {offset}")) .collect::>(); - let arr_nanos = TimestampNanosecondArray::from(ts_nanos); - let arr_micros = TimestampMicrosecondArray::from(ts_micros); - let arr_millis = TimestampMillisecondArray::from(ts_millis); - let arr_seconds = TimestampSecondArray::from(ts_seconds); + let arr_nanos = TimestampNanosecondArray::from(ts_nanos.clone()); + let arr_nanos_timezoned = + TimestampNanosecondArray::from(ts_nanos).with_timezone(tz_string); + let arr_micros = TimestampMicrosecondArray::from(ts_micros.clone()); + let arr_micros_timezoned = + TimestampMicrosecondArray::from(ts_micros).with_timezone(tz_string); + let arr_millis = TimestampMillisecondArray::from(ts_millis.clone()); + let arr_millis_timezoned = + TimestampMillisecondArray::from(ts_millis).with_timezone(tz_string); + let arr_seconds = TimestampSecondArray::from(ts_seconds.clone()); + let arr_seconds_timezoned = + TimestampSecondArray::from(ts_seconds).with_timezone(tz_string); let names = names.iter().map(|s| s.as_str()).collect::>(); let arr_names = StringArray::from(names); let schema = Schema::new(vec![ Field::new("nanos", arr_nanos.data_type().clone(), true), + Field::new( + "nanos_timezoned", + arr_nanos_timezoned.data_type().clone(), + true, + ), Field::new("micros", arr_micros.data_type().clone(), true), + Field::new( + "micros_timezoned", + arr_micros_timezoned.data_type().clone(), + true, + ), Field::new("millis", arr_millis.data_type().clone(), true), + Field::new( + "millis_timezoned", + arr_millis_timezoned.data_type().clone(), + true, + ), Field::new("seconds", arr_seconds.data_type().clone(), true), + Field::new( + "seconds_timezoned", + arr_seconds_timezoned.data_type().clone(), + true, + ), Field::new("name", arr_names.data_type().clone(), true), ]); let schema = Arc::new(schema); @@ -403,9 +437,13 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch { schema, vec![ Arc::new(arr_nanos), + Arc::new(arr_nanos_timezoned), Arc::new(arr_micros), + Arc::new(arr_micros_timezoned), Arc::new(arr_millis), + Arc::new(arr_millis_timezoned), Arc::new(arr_seconds), + Arc::new(arr_seconds_timezoned), Arc::new(arr_names), ], )