diff --git a/.gitignore b/.gitignore index e9b003dc5..47a4e8aa4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +*.swp + **/target .vscode connectorx-python/connectorx/*.so diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index bf8892e64..0799d17af 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -476,14 +476,113 @@ impl_produce!( &'r str, Vec, NaiveTime, - NaiveDateTime, - DateTime, - NaiveDate, + // NaiveDateTime, + // DateTime, + // NaiveDate, Uuid, Value, Vec, ); +impl<'r, 'a> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveDateTime { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX, + postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN, + postgres::types::Timestamp::Value(t) => t, + } + } +} + +impl<'r, 'a> Produce<'r, Option> for PostgresBinarySourcePartitionParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX), + Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN), + Some(postgres::types::Timestamp::Value(t)) => t, + None => None + } + } +} + +impl<'r, 'a> Produce<'r, DateTime> for PostgresBinarySourcePartitionParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> DateTime { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + postgres::types::Timestamp::PosInfinity => DateTime::::MAX_UTC, + postgres::types::Timestamp::NegInfinity => DateTime::::MIN_UTC, + postgres::types::Timestamp::Value(t) => t, + } + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresBinarySourcePartitionParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::::MAX_UTC), + Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::::MIN_UTC), + Some(postgres::types::Timestamp::Value(t)) => t, + None => None + } + } +} + +impl<'r, 'a> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveDate { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + postgres::types::Date::PosInfinity => NaiveDate::MAX, + postgres::types::Date::NegInfinity => NaiveDate::MIN, + postgres::types::Date::Value(t) => t, + } + } +} + +impl<'r, 'a> Produce<'r, Option> for PostgresBinarySourcePartitionParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX), + Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN), + Some(postgres::types::Date::Value(t)) => t, + None => None + } + } +} + impl<'r, 'a> Produce<'r, HashMap>> for PostgresBinarySourcePartitionParser<'a> { @@ -601,7 +700,7 @@ macro_rules! impl_csv_produce { }; } -impl_csv_produce!(i8, i16, i32, i64, f32, f64, Decimal, Uuid,); +impl_csv_produce!(i8, i16, i32, i64, f32, f64, Uuid,); macro_rules! impl_csv_vec_produce { ($($t: ty,)+) => { @@ -754,17 +853,56 @@ impl<'r, 'a> Produce<'r, Option>> for PostgresCSVSourceParser<'a> { } } + +impl<'r, 'a> Produce<'r, Decimal> for PostgresCSVSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Decimal { + let (ridx, cidx) = self.next_loc()?; + match &self.rowbuf[ridx][cidx][..] { + "Infinity" => Decimal::MAX, + "-Infinity" => Decimal::MIN, + v => v.parse().map_err(|_| { + ConnectorXError::cannot_produce::(Some(v.into())) + })? + } + } +} + + +impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + match &self.rowbuf[ridx][cidx][..] { + "" => None, + "Infinity" => Some(Decimal::MAX), + "-Infinity" => Some(Decimal::MIN), + v => Some(v.parse().map_err(|_| { + ConnectorXError::cannot_produce::(Some(v.into())) + })?), + } + } +} + + impl<'r, 'a> Produce<'r, DateTime> for PostgresCSVSourceParser<'a> { type Error = PostgresSourceError; #[throws(PostgresSourceError)] fn produce(&mut self) -> DateTime { let (ridx, cidx) = self.next_loc()?; - let s: &str = &self.rowbuf[ridx][cidx][..]; - // postgres csv return example: 1970-01-01 00:00:01+00 - format!("{}:00", s).parse().map_err(|_| { - ConnectorXError::cannot_produce::>(Some(self.rowbuf[ridx][cidx].into())) - })? + match &self.rowbuf[ridx][cidx][..] { + "infinity" => DateTime::::MAX_UTC, + "-infinity" => DateTime::::MIN_UTC, + // postgres csv return example: 1970-01-01 00:00:01+00 + v => format!("{}:00", v).parse().map_err(|_| { + ConnectorXError::cannot_produce::>(Some(v.into())) + })? + } } } @@ -776,6 +914,8 @@ impl<'r, 'a> Produce<'r, Option>> for PostgresCSVSourceParser<'a> let (ridx, cidx) = self.next_loc()?; match &self.rowbuf[ridx][cidx][..] { "" => None, + "infinity" => Some(DateTime::::MAX_UTC), + "-infinity" => Some(DateTime::::MIN_UTC), v => { // postgres csv return example: 1970-01-01 00:00:01+00 Some(format!("{}:00", v).parse().map_err(|_| { @@ -792,9 +932,13 @@ impl<'r, 'a> Produce<'r, NaiveDate> for PostgresCSVSourceParser<'a> { #[throws(PostgresSourceError)] fn produce(&mut self) -> NaiveDate { let (ridx, cidx) = self.next_loc()?; - NaiveDate::parse_from_str(&self.rowbuf[ridx][cidx], "%Y-%m-%d").map_err(|_| { - ConnectorXError::cannot_produce::(Some(self.rowbuf[ridx][cidx].into())) - })? + match &self.rowbuf[ridx][cidx][..] { + "infinity" => NaiveDate::MAX, + "-infinity" => NaiveDate::MIN, + v => NaiveDate::parse_from_str(v, "%Y-%m-%d").map_err(|_| { + ConnectorXError::cannot_produce::(Some(v.into())) + })? + } } } @@ -806,6 +950,8 @@ impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { let (ridx, cidx) = self.next_loc()?; match &self.rowbuf[ridx][cidx][..] { "" => None, + "infinity" => Some(NaiveDate::MAX), + "-infinity" => Some(NaiveDate::MIN), v => Some( NaiveDate::parse_from_str(v, "%Y-%m-%d") .map_err(|_| ConnectorXError::cannot_produce::(Some(v.into())))?, @@ -820,13 +966,17 @@ impl<'r, 'a> Produce<'r, NaiveDateTime> for PostgresCSVSourceParser<'a> { #[throws(PostgresSourceError)] fn produce(&mut self) -> NaiveDateTime { let (ridx, cidx) = self.next_loc()?; - NaiveDateTime::parse_from_str(&self.rowbuf[ridx][cidx], "%Y-%m-%d %H:%M:%S").map_err( - |_| { - ConnectorXError::cannot_produce::(Some( - self.rowbuf[ridx][cidx].into(), - )) - }, - )? + match &self.rowbuf[ridx][cidx] { + "infinity" => NaiveDateTime::MAX, + "-infinity" => NaiveDateTime::MIN, + v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S").map_err( + |_| { + ConnectorXError::cannot_produce::(Some( + v.into(), + )) + }, + )? + } } } @@ -838,6 +988,8 @@ impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> let (ridx, cidx) = self.next_loc()?; match &self.rowbuf[ridx][cidx][..] { "" => None, + "infinity" => Some(NaiveDateTime::MAX), + "-infinity" => Some(NaiveDateTime::MIN), v => Some( NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S").map_err(|_| { ConnectorXError::cannot_produce::(Some(v.into())) @@ -1059,15 +1211,114 @@ impl_produce!( &'r str, Vec, NaiveTime, - NaiveDateTime, - DateTime, - NaiveDate, Uuid, Value, HashMap>, Vec, ); +impl<'r, 'a> Produce<'r, DateTime> for PostgresRawSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> DateTime { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val: postgres::types::Timestamp> = row.try_get(cidx)?; + match val { + postgres::types::Timestamp::PosInfinity => DateTime::::MAX_UTC, + postgres::types::Timestamp::NegInfinity => DateTime::::MIN_UTC, + postgres::types::Timestamp::Value(t) => t, + } + } +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresRawSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::::MAX_UTC), + Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::::MIN_UTC), + Some(postgres::types::Timestamp::Value(t)) => t, + None => None + } + + } +} + +impl<'r, 'a> Produce<'r, NaiveDateTime> for PostgresRawSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveDateTime { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val: postgres::types::Timestamp = row.try_get(cidx)?; + match val { + postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX, + postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN, + postgres::types::Timestamp::Value(t) => t, + } + } +} + +impl<'r, 'a> Produce<'r, Option> for PostgresRawSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX), + Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN), + Some(postgres::types::Timestamp::Value(t)) => t, + None => None + } + + } +} + +impl<'r, 'a> Produce<'r, NaiveDate> for PostgresRawSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> NaiveDate { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val: postgres::types::Date = row.try_get(cidx)?; + match val { + postgres::types::Date::PosInfinity => NaiveDate::MAX, + postgres::types::Date::NegInfinity => NaiveDate::MIN, + postgres::types::Date::Value(t) => t, + } + } +} + +impl<'r, 'a> Produce<'r, Option> for PostgresRawSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val = row.try_get(cidx)?; + match val { + Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX), + Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN), + Some(postgres::types::Date::Value(t)) => t, + None => None, + } + + } +} + impl SourcePartition for PostgresSourcePartition where C: MakeTlsConnect + Clone + 'static + Sync + Send, @@ -1217,7 +1468,63 @@ macro_rules! impl_simple_produce { }; } -impl_simple_produce!(i8, i16, i32, i64, f32, f64, Decimal, Uuid, bool,); +impl_simple_produce!(i8, i16, i32, i64, f32, f64, Uuid, bool,); + +impl<'r> Produce<'r, Decimal> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Decimal { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some("Infinity") => Decimal::MAX, + Some("-Infinity") => Decimal::MIN, + Some(s) => s + .parse() + .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, + None => throw!(anyhow!( + "Cannot parse NULL in NOT NULL column." + )), + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + +impl<'r, 'a> Produce<'r, Option> for PostgresSimpleSourceParser { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option { + let (ridx, cidx) = self.next_loc()?; + let val = match &self.rows[ridx] { + SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some("Infinity") => Some(Decimal::MAX), + Some("-Infinity") => Some(Decimal::MIN), + Some(s) => Some( + s.parse() + .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, + ), + None => None, + }, + SimpleQueryMessage::CommandComplete(c) => { + panic!("get command: {}", c); + } + _ => { + panic!("what?"); + } + }; + val + } +} + impl_simple_produce_unimplemented!( Value, HashMap>,); @@ -1477,8 +1784,12 @@ impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser { let (ridx, cidx) = self.next_loc()?; let val = match &self.rows[ridx] { SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => NaiveDate::parse_from_str(s, "%Y-%m-%d") - .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, + Some(s) => match s { + "infinity" => NaiveDate::MAX, + "-infinity" => NaiveDate::MIN, + s => NaiveDate::parse_from_str(s, "%Y-%m-%d") + .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, + } None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), }, SimpleQueryMessage::CommandComplete(c) => { @@ -1500,9 +1811,13 @@ impl<'r> Produce<'r, Option> for PostgresSimpleSourceParser { let (ridx, cidx) = self.next_loc()?; let val = match &self.rows[ridx] { SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| { - ConnectorXError::cannot_produce::>(Some(s.into())) - })?), + Some(s) => match s { + "infinity" => Some(NaiveDate::MAX), + "-infinity" => Some(NaiveDate::MIN), + s => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| { + ConnectorXError::cannot_produce::>(Some(s.into())) + })?), + }, None => None, }, SimpleQueryMessage::CommandComplete(c) => { @@ -1571,9 +1886,13 @@ impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser { let (ridx, cidx) = self.next_loc()?; let val = match &self.rows[ridx] { SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map_err(|_| { - ConnectorXError::cannot_produce::(Some(s.into())) - })?, + Some(s) => match s { + "infinity" => NaiveDateTime::MAX, + "-infinity" => NaiveDateTime::MIN, + s => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map_err(|_| { + ConnectorXError::cannot_produce::(Some(s.into())) + })?, + }, None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), }, SimpleQueryMessage::CommandComplete(c) => { @@ -1595,11 +1914,15 @@ impl<'r> Produce<'r, Option> for PostgresSimpleSourceParser { let (ridx, cidx) = self.next_loc()?; let val = match &self.rows[ridx] { SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { - Some(s) => Some( - NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map_err(|_| { - ConnectorXError::cannot_produce::>(Some(s.into())) - })?, - ), + Some(s) => match s { + "infinity" => Some(NaiveDateTime::MAX), + "-infinity" => Some(NaiveDateTime::MIN), + s => Some( + NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map_err(|_| { + ConnectorXError::cannot_produce::>(Some(s.into())) + })?, + ), + }, None => None, }, SimpleQueryMessage::CommandComplete(c) => { @@ -1621,6 +1944,8 @@ impl<'r> Produce<'r, DateTime> for PostgresSimpleSourceParser { let (ridx, cidx) = self.next_loc()?; let val = match &self.rows[ridx] { SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some("infinity") => DateTime::::MAX_UTC, + Some("-infinity") => DateTime::::MIN_UTC, Some(s) => { let time_string = format!("{}:00", s).to_owned(); let slice: &str = &time_string[..]; @@ -1650,6 +1975,8 @@ impl<'r> Produce<'r, Option>> for PostgresSimpleSourceParser { let (ridx, cidx) = self.next_loc()?; let val = match &self.rows[ridx] { SimpleQueryMessage::Row(row) => match row.try_get(cidx)? { + Some("infinity") => Some(DateTime::::MAX_UTC), + Some("-infinity") => Some(DateTime::::MIN_UTC), Some(s) => { let time_string = format!("{}:00", s).to_owned(); let slice: &str = &time_string[..]; diff --git a/connectorx/tests/test_postgres.rs b/connectorx/tests/test_postgres.rs index 2beb6f14f..6ece56f65 100644 --- a/connectorx/tests/test_postgres.rs +++ b/connectorx/tests/test_postgres.rs @@ -1,3 +1,5 @@ +use chrono::{NaiveDate, NaiveDateTime, DateTime, Utc}; +use rust_decimal::Decimal; use arrow::{ array::{BooleanArray, Float64Array, Int64Array, StringArray}, record_batch::RecordBatch, @@ -5,7 +7,7 @@ use arrow::{ use connectorx::{ destinations::arrow::ArrowDestination, prelude::*, - sources::postgres::{rewrite_tls_args, BinaryProtocol, CSVProtocol, PostgresSource}, + sources::postgres::{rewrite_tls_args, BinaryProtocol, CSVProtocol, PostgresSource, SimpleProtocol, CursorProtocol}, sources::PartitionParser, sql::CXQuery, transports::PostgresArrowTransport, @@ -149,6 +151,355 @@ fn test_postgres() { verify_arrow_results(result); } +#[test] +fn test_csv_infinite_values_binary_proto_option() { + + let _ = env_logger::builder().is_test(true).try_init(); + + let dburl = env::var("POSTGRES_URL").unwrap(); + #[derive(Debug, PartialEq)] + struct Row(Option, Option, Option>); + + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); + source.set_queries(&[CXQuery::naked("SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values")]); + source.fetch_metadata().unwrap(); + + let mut partitions = source.partition().unwrap(); + assert!(partitions.len() == 1); + let mut partition = partitions.remove(0); + partition.result_rows().expect("run query"); + + assert_eq!(3, partition.nrows()); + assert_eq!(3, partition.ncols()); + + let mut parser = partition.parser().unwrap(); + + let mut rows: Vec = Vec::new(); + loop { + let (n, is_last) = parser.fetch_next().unwrap(); + for _i in 0..n { + rows.push(Row( + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + )); + } + if is_last { + break; + } + } + assert_eq!( + vec![ + Row(Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(DateTime::::MAX_UTC)), + Row(Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(DateTime::::MIN_UTC)), + Row(None, None, None), + ], + rows + ); +} + +#[test] +fn test_infinite_values_cursor_proto_option() { + + let _ = env_logger::builder().is_test(true).try_init(); + + let dburl = env::var("POSTGRES_URL").unwrap(); + #[derive(Debug, PartialEq)] + struct Row(Option, Option, Option>); + + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); + source.set_queries(&[CXQuery::naked("SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values")]); + source.fetch_metadata().unwrap(); + + let mut partitions = source.partition().unwrap(); + assert!(partitions.len() == 1); + let mut partition = partitions.remove(0); + partition.result_rows().expect("run query"); + + assert_eq!(3, partition.nrows()); + assert_eq!(3, partition.ncols()); + + let mut parser = partition.parser().unwrap(); + + let mut rows: Vec = Vec::new(); + loop { + let (n, is_last) = parser.fetch_next().unwrap(); + for _i in 0..n { + rows.push(Row( + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + )); + } + if is_last { + break; + } + } + assert_eq!( + vec![ + Row(Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(DateTime::::MAX_UTC)), + Row(Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(DateTime::::MIN_UTC)), + Row(None, None, None), + ], + rows + ); +} + +#[test] +fn test_csv_infinite_values_cursor_proto() { + + let _ = env_logger::builder().is_test(true).try_init(); + + let dburl = env::var("POSTGRES_URL").unwrap(); + #[derive(Debug, PartialEq)] + struct Row(NaiveDate, NaiveDateTime, DateTime); + + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); + source.set_queries(&[CXQuery::naked("SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values WHERE test_date IS NOT NULL")]); + source.fetch_metadata().unwrap(); + + let mut partitions = source.partition().unwrap(); + assert!(partitions.len() == 1); + let mut partition = partitions.remove(0); + partition.result_rows().expect("run query"); + + assert_eq!(2, partition.nrows()); + assert_eq!(3, partition.ncols()); + + let mut parser = partition.parser().unwrap(); + + let mut rows: Vec = Vec::new(); + loop { + let (n, is_last) = parser.fetch_next().unwrap(); + for _i in 0..n { + rows.push(Row( + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + )); + } + if is_last { + break; + } + } + assert_eq!( + vec![ + Row(NaiveDate::MAX, NaiveDateTime::MAX, DateTime::::MAX_UTC), + Row(NaiveDate::MIN, NaiveDateTime::MIN, DateTime::::MIN_UTC), + ], + rows + ); +} + +#[test] +fn test_csv_infinite_values_simple_proto() { + + let _ = env_logger::builder().is_test(true).try_init(); + + let dburl = env::var("POSTGRES_URL").unwrap(); + #[derive(Debug, PartialEq)] + struct Row(i32, NaiveDate, NaiveDateTime, Decimal, DateTime); + + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); + source.set_queries(&[CXQuery::naked("select * from test_infinite_values WHERE test_date IS NOT NULL")]); + source.fetch_metadata().unwrap(); + + let mut partitions = source.partition().unwrap(); + assert!(partitions.len() == 1); + let mut partition = partitions.remove(0); + partition.result_rows().expect("run query"); + + assert_eq!(2, partition.nrows()); + assert_eq!(5, partition.ncols()); + + let mut parser = partition.parser().unwrap(); + + let mut rows: Vec = Vec::new(); + loop { + let (n, is_last) = parser.fetch_next().unwrap(); + for _i in 0..n { + rows.push(Row( + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + )); + } + if is_last { + break; + } + } + assert_eq!( + vec![ + Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX, DateTime::::MAX_UTC), + Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN, DateTime::::MIN_UTC), + ], + rows + ); +} + +#[test] +fn test_csv_infinite_values_simple_proto_option() { + + let _ = env_logger::builder().is_test(true).try_init(); + + let dburl = env::var("POSTGRES_URL").unwrap(); + #[derive(Debug, PartialEq)] + struct Row(i32, Option, Option, Option, Option>); + + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); + source.set_queries(&[CXQuery::naked("select * from test_infinite_values")]); + source.fetch_metadata().unwrap(); + + let mut partitions = source.partition().unwrap(); + assert!(partitions.len() == 1); + let mut partition = partitions.remove(0); + partition.result_rows().expect("run query"); + + assert_eq!(3, partition.nrows()); + assert_eq!(5, partition.ncols()); + + let mut parser = partition.parser().unwrap(); + + let mut rows: Vec = Vec::new(); + loop { + let (n, is_last) = parser.fetch_next().unwrap(); + for _i in 0..n { + rows.push(Row( + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + )); + } + if is_last { + break; + } + } + assert_eq!( + vec![ + Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX), Some(DateTime::::MAX_UTC)), + Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN), Some(DateTime::::MIN_UTC)), + Row(3, None, None, None, None, ) + ], + rows + ); +} + +#[test] +fn test_csv_infinite_values() { + + let _ = env_logger::builder().is_test(true).try_init(); + + let dburl = env::var("POSTGRES_URL").unwrap(); + #[derive(Debug, PartialEq)] + struct Row(i32, NaiveDate, NaiveDateTime, Decimal, DateTime); + + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); + source.set_queries(&[CXQuery::naked("select * from test_infinite_values WHERE test_date IS NOT NULL")]); + source.fetch_metadata().unwrap(); + + let mut partitions = source.partition().unwrap(); + assert!(partitions.len() == 1); + let mut partition = partitions.remove(0); + partition.result_rows().expect("run query"); + + assert_eq!(2, partition.nrows()); + assert_eq!(5, partition.ncols()); + + let mut parser = partition.parser().unwrap(); + + let mut rows: Vec = Vec::new(); + loop { + let (n, is_last) = parser.fetch_next().unwrap(); + for _i in 0..n { + rows.push(Row( + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + )); + } + if is_last { + break; + } + } + assert_eq!( + vec![ + Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX, DateTime::::MAX_UTC), + Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN, DateTime::::MIN_UTC), + ], + rows + ); +} + +#[test] +fn test_csv_infinite_values_option() { + + let _ = env_logger::builder().is_test(true).try_init(); + + let dburl = env::var("POSTGRES_URL").unwrap(); + #[derive(Debug, PartialEq)] + struct Row(i32, Option, Option, Option, Option>); + + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); + source.set_queries(&[CXQuery::naked("select * from test_infinite_values")]); + source.fetch_metadata().unwrap(); + + let mut partitions = source.partition().unwrap(); + assert!(partitions.len() == 1); + let mut partition = partitions.remove(0); + partition.result_rows().expect("run query"); + + assert_eq!(3, partition.nrows()); + assert_eq!(5, partition.ncols()); + + let mut parser = partition.parser().unwrap(); + + let mut rows: Vec = Vec::new(); + loop { + let (n, is_last) = parser.fetch_next().unwrap(); + for _i in 0..n { + rows.push(Row( + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + parser.produce().unwrap(), + )); + } + if is_last { + break; + } + } + assert_eq!( + vec![ + Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX), Some(DateTime::::MAX_UTC)), + Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN), Some(DateTime::::MIN_UTC)), + Row(3, None, None, None, None, ) + ], + rows + ); +} + + #[test] fn test_postgres_csv() { let _ = env_logger::builder().is_test(true).try_init(); diff --git a/scripts/postgres.sql b/scripts/postgres.sql index 559f22273..9b9bb1b04 100644 --- a/scripts/postgres.sql +++ b/scripts/postgres.sql @@ -1,6 +1,7 @@ DROP TABLE IF EXISTS test_table; DROP TABLE IF EXISTS test_str; DROP TABLE IF EXISTS test_types; +DROP TABLE IF EXISTS test_infinite_values; DROP TYPE IF EXISTS happiness; DROP EXTENSION IF EXISTS citext; DROP EXTENSION IF EXISTS ltree; @@ -20,6 +21,19 @@ INSERT INTO test_table VALUES (3, 7, 'b', 3, FALSE); INSERT INTO test_table VALUES (4, 9, 'c', 7.8, NULL); INSERT INTO test_table VALUES (1314, 2, NULL, -10, TRUE); +CREATE TABLE IF NOT EXISTS test_infinite_values( + test_int INTEGER NOT NULL, + test_date DATE, + test_timestamp TIMESTAMP, + test_real REAL, + test_timestamp_timezone TIMESTAMP WITH TIME ZONE +); + +INSERT INTO test_infinite_values VALUES (1, 'infinity'::DATE, 'infinity'::TIMESTAMP, 'infinity'::REAL, 'infinity'::TIMESTAMP); +INSERT INTO test_infinite_values VALUES (2, '-infinity'::DATE, '-infinity'::TIMESTAMP, '-infinity'::REAL, '-infinity'::TIMESTAMP); +INSERT INTO test_infinite_values VALUES (3,NULL, NULL, NULL, NULL); + + CREATE TABLE IF NOT EXISTS test_str( id INTEGER NOT NULL, test_language TEXT,