diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index 784f2cd93..f4fbd4148 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -780,6 +780,8 @@ impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { let (ridx, cidx) = self.next_loc()?; match &self.rowbuf[ridx][cidx][..] { "" => None, + "Infinity" => Some(Decimal::MAX), + "-Infinity" => Some(Decimal::MIN), v => Some(v.parse().map_err(|_| { ConnectorXError::cannot_produce::(Some(v.into())) })?), @@ -844,6 +846,8 @@ impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { let (ridx, cidx) = self.next_loc()?; match &self.rowbuf[ridx][cidx][..] { "" => None, + "infinity" => Some(NaiveDate::MAX), + "-infinity" => Some(NaiveDate::MIN), v => Some( NaiveDate::parse_from_str(v, "%Y-%m-%d") .map_err(|_| ConnectorXError::cannot_produce::(Some(v.into())))?, @@ -880,6 +884,8 @@ impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> let (ridx, cidx) = self.next_loc()?; match &self.rowbuf[ridx][cidx][..] { "" => None, + "infinity" => Some(NaiveDateTime::MAX), + "-infinity" => Some(NaiveDateTime::MIN), v => Some( NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S").map_err(|_| { ConnectorXError::cannot_produce::(Some(v.into())) diff --git a/connectorx/tests/test_postgres.rs b/connectorx/tests/test_postgres.rs index e6a01e633..5bdc21670 100644 --- a/connectorx/tests/test_postgres.rs +++ b/connectorx/tests/test_postgres.rs @@ -200,6 +200,56 @@ fn test_csv_infinite_values() { ); } +#[test] +fn test_csv_infinite_values_option() { + + let _ = env_logger::builder().is_test(true).try_init(); + + let dburl = env::var("POSTGRES_URL").unwrap(); + #[derive(Debug, PartialEq)] + struct Row(i32, Option, Option, Option); + + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); + source.set_queries(&[CXQuery::naked("select * from test_infinite_values")]); + source.fetch_metadata().unwrap(); + + let mut partitions = source.partition().unwrap(); + assert!(partitions.len() == 1); + let mut partition = partitions.remove(0); + partition.result_rows().expect("run query"); + + assert_eq!(2, partition.nrows()); + assert_eq!(4, partition.ncols()); + + let mut parser = partition.parser().unwrap(); + + let mut rows: Vec = Vec::new(); + loop { + let (n, is_last) = parser.fetch_next().unwrap(); + for _i in 0..n { + rows.push(Row( + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + )); + } + if is_last { + break; + } + } + assert_eq!( + vec![ + Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX)), + Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN)), + ], + rows + ); +} + + #[test] fn test_postgres_csv() { let _ = env_logger::builder().is_test(true).try_init();