diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index f4fbd4148..aed115979 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -796,11 +796,14 @@ impl<'r, 'a> Produce<'r, DateTime> for PostgresCSVSourceParser<'a> { #[throws(PostgresSourceError)] fn produce(&mut self) -> DateTime { let (ridx, cidx) = self.next_loc()?; - let s: &str = &self.rowbuf[ridx][cidx][..]; - // postgres csv return example: 1970-01-01 00:00:01+00 - format!("{}:00", s).parse().map_err(|_| { - ConnectorXError::cannot_produce::>(Some(self.rowbuf[ridx][cidx].into())) - })? + match &self.rowbuf[ridx][cidx][..] { + "infinity" => DateTime::::MAX_UTC, + "-infinity" => DateTime::::MIN_UTC, + // postgres csv return example: 1970-01-01 00:00:01+00 + v => format!("{}:00", v).parse().map_err(|_| { + ConnectorXError::cannot_produce::>(Some(v.into())) + })? + } } } @@ -812,6 +815,8 @@ impl<'r, 'a> Produce<'r, Option>> for PostgresCSVSourceParser<'a> let (ridx, cidx) = self.next_loc()?; match &self.rowbuf[ridx][cidx][..] { "" => None, + "infinity" => Some(DateTime::::MAX_UTC), + "-infinity" => Some(DateTime::::MIN_UTC), v => { // postgres csv return example: 1970-01-01 00:00:01+00 Some(format!("{}:00", v).parse().map_err(|_| { diff --git a/connectorx/tests/test_postgres.rs b/connectorx/tests/test_postgres.rs index 5bdc21670..8ad725384 100644 --- a/connectorx/tests/test_postgres.rs +++ b/connectorx/tests/test_postgres.rs @@ -1,4 +1,4 @@ -use chrono::{NaiveDate, NaiveDateTime}; +use chrono::{NaiveDate, NaiveDateTime, DateTime, Utc}; use rust_decimal::Decimal; use arrow::{ array::{BooleanArray, Float64Array, Int64Array, StringArray}, @@ -158,7 +158,7 @@ fn test_csv_infinite_values() { let dburl = env::var("POSTGRES_URL").unwrap(); #[derive(Debug, PartialEq)] - struct Row(i32, NaiveDate, NaiveDateTime, Decimal); + struct Row(i32, NaiveDate, NaiveDateTime, Decimal, DateTime); let url = Url::parse(dburl.as_str()).unwrap(); let (config, _tls) = rewrite_tls_args(&url).unwrap(); @@ -172,7 +172,7 @@ fn test_csv_infinite_values() { partition.result_rows().expect("run query"); assert_eq!(2, partition.nrows()); - assert_eq!(4, partition.ncols()); + assert_eq!(5, partition.ncols()); let mut parser = partition.parser().unwrap(); @@ -185,6 +185,7 @@ fn test_csv_infinite_values() { parser.produce().unwrap(), parser.produce().unwrap(), parser.produce().unwrap(), + parser.produce().unwrap(), )); } if is_last { @@ -193,8 +194,8 @@ fn test_csv_infinite_values() { } assert_eq!( vec![ - Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX), - Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN), + Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX, DateTime::::MAX_UTC), + Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN, DateTime::::MIN_UTC), ], rows ); @@ -207,7 +208,7 @@ fn test_csv_infinite_values_option() { let dburl = env::var("POSTGRES_URL").unwrap(); #[derive(Debug, PartialEq)] - struct Row(i32, Option, Option, Option); + struct Row(i32, Option, Option, Option, Option>); let url = Url::parse(dburl.as_str()).unwrap(); let (config, _tls) = rewrite_tls_args(&url).unwrap(); @@ -221,7 +222,7 @@ fn test_csv_infinite_values_option() { partition.result_rows().expect("run query"); assert_eq!(2, partition.nrows()); - assert_eq!(4, partition.ncols()); + assert_eq!(5, partition.ncols()); let mut parser = partition.parser().unwrap(); @@ -234,6 +235,7 @@ fn test_csv_infinite_values_option() { parser.produce().unwrap(), parser.produce().unwrap(), parser.produce().unwrap(), + parser.produce().unwrap(), )); } if is_last { @@ -242,8 +244,8 @@ fn test_csv_infinite_values_option() { } assert_eq!( vec![ - Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX)), - Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN)), + Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX), Some(DateTime::::MAX_UTC)), + Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN), Some(DateTime::::MIN_UTC)), ], rows ); diff --git a/scripts/postgres.sql b/scripts/postgres.sql index c60a4b864..b859cd8d5 100644 --- a/scripts/postgres.sql +++ b/scripts/postgres.sql @@ -25,11 +25,12 @@ CREATE TABLE IF NOT EXISTS test_infinite_values( test_int INTEGER NOT NULL, test_date DATE, test_timestamp TIMESTAMP, - test_real REAL + test_real REAL, + test_timestamp_timezone TIMESTAMP WITH TIME ZONE ); -INSERT INTO test_infinite_values VALUES (1, 'infinity'::DATE, 'infinity'::TIMESTAMP, 'infinity'::REAL); -INSERT INTO test_infinite_values VALUES (2, '-infinity'::DATE, '-infinity'::TIMESTAMP, '-infinity'::REAL); +INSERT INTO test_infinite_values VALUES (1, 'infinity'::DATE, 'infinity'::TIMESTAMP, 'infinity'::REAL, 'infinity'::TIMESTAMP); +INSERT INTO test_infinite_values VALUES (2, '-infinity'::DATE, '-infinity'::TIMESTAMP, '-infinity'::REAL, '-infinity'::TIMESTAMP); CREATE TABLE IF NOT EXISTS test_str( id INTEGER NOT NULL,