Skip to content

Commit

Permalink
Got Infinity values to work
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewJackson2020 committed Feb 14, 2024
1 parent 05283cf commit 41c7896
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 16 deletions.
85 changes: 69 additions & 16 deletions connectorx/src/sources/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ macro_rules! impl_csv_produce {
};
}

impl_csv_produce!(i8, i16, i32, i64, f32, f64, Decimal, Uuid,);
impl_csv_produce!(i8, i16, i32, i64, f32, f64, Uuid,);

macro_rules! impl_csv_vec_produce {
($($t: ty,)+) => {
Expand Down Expand Up @@ -754,17 +754,56 @@ impl<'r, 'a> Produce<'r, Option<Vec<bool>>> for PostgresCSVSourceParser<'a> {
}
}


impl<'r, 'a> Produce<'r, Decimal> for PostgresCSVSourceParser<'a> {
type Error = PostgresSourceError;

#[throws(PostgresSourceError)]
fn produce(&'r mut self) -> Decimal {
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"Infinity" => Decimal::MAX,
"-Infinity" => Decimal::MIN,
v => v.parse().map_err(|_| {
ConnectorXError::cannot_produce::<Decimal>(Some(v.into()))
})?
}
}
}


impl<'r, 'a> Produce<'r, Option<Decimal>> for PostgresCSVSourceParser<'a> {
type Error = PostgresSourceError;

#[throws(PostgresSourceError)]
fn produce(&'r mut self) -> Option<Decimal> {
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"" => None,
"Infinity" => Some(Decimal::MAX),
"-Infinity" => Some(Decimal::MIN),
v => Some(v.parse().map_err(|_| {
ConnectorXError::cannot_produce::<Decimal>(Some(v.into()))
})?),
}
}
}


impl<'r, 'a> Produce<'r, DateTime<Utc>> for PostgresCSVSourceParser<'a> {
type Error = PostgresSourceError;

#[throws(PostgresSourceError)]
fn produce(&mut self) -> DateTime<Utc> {
let (ridx, cidx) = self.next_loc()?;
let s: &str = &self.rowbuf[ridx][cidx][..];
// postgres csv return example: 1970-01-01 00:00:01+00
format!("{}:00", s).parse().map_err(|_| {
ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(self.rowbuf[ridx][cidx].into()))
})?
match &self.rowbuf[ridx][cidx][..] {
"infinity" => DateTime::<Utc>::MAX_UTC,
"-infinity" => DateTime::<Utc>::MIN_UTC,
// postgres csv return example: 1970-01-01 00:00:01+00
v => format!("{}:00", v).parse().map_err(|_| {
ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into()))
})?
}
}
}

Expand All @@ -776,6 +815,8 @@ impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for PostgresCSVSourceParser<'a>
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"" => None,
"infinity" => Some(DateTime::<Utc>::MAX_UTC),
"-infinity" => Some(DateTime::<Utc>::MIN_UTC),
v => {
// postgres csv return example: 1970-01-01 00:00:01+00
Some(format!("{}:00", v).parse().map_err(|_| {
Expand All @@ -792,9 +833,13 @@ impl<'r, 'a> Produce<'r, NaiveDate> for PostgresCSVSourceParser<'a> {
#[throws(PostgresSourceError)]
fn produce(&mut self) -> NaiveDate {
let (ridx, cidx) = self.next_loc()?;
NaiveDate::parse_from_str(&self.rowbuf[ridx][cidx], "%Y-%m-%d").map_err(|_| {
ConnectorXError::cannot_produce::<NaiveDate>(Some(self.rowbuf[ridx][cidx].into()))
})?
match &self.rowbuf[ridx][cidx][..] {
"infinity" => NaiveDate::MAX,
"-infinity" => NaiveDate::MIN,
v => NaiveDate::parse_from_str(v, "%Y-%m-%d").map_err(|_| {
ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into()))
})?
}
}
}

Expand All @@ -806,6 +851,8 @@ impl<'r, 'a> Produce<'r, Option<NaiveDate>> for PostgresCSVSourceParser<'a> {
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"" => None,
"infinity" => Some(NaiveDate::MAX),
"-infinity" => Some(NaiveDate::MIN),
v => Some(
NaiveDate::parse_from_str(v, "%Y-%m-%d")
.map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
Expand All @@ -820,13 +867,17 @@ impl<'r, 'a> Produce<'r, NaiveDateTime> for PostgresCSVSourceParser<'a> {
#[throws(PostgresSourceError)]
fn produce(&mut self) -> NaiveDateTime {
let (ridx, cidx) = self.next_loc()?;
NaiveDateTime::parse_from_str(&self.rowbuf[ridx][cidx], "%Y-%m-%d %H:%M:%S").map_err(
|_| {
ConnectorXError::cannot_produce::<NaiveDateTime>(Some(
self.rowbuf[ridx][cidx].into(),
))
},
)?
match &self.rowbuf[ridx][cidx] {
"infinity" => NaiveDateTime::MAX,
"-infinity" => NaiveDateTime::MIN,
v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S").map_err(
|_| {
ConnectorXError::cannot_produce::<NaiveDateTime>(Some(
v.into(),
))
},
)?
}
}
}

Expand All @@ -838,6 +889,8 @@ impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for PostgresCSVSourceParser<'a>
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"" => None,
"infinity" => Some(NaiveDateTime::MAX),
"-infinity" => Some(NaiveDateTime::MIN),
v => Some(
NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S").map_err(|_| {
ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into()))
Expand Down
103 changes: 103 additions & 0 deletions connectorx/tests/test_postgres.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use chrono::{NaiveDate, NaiveDateTime, DateTime, Utc};
use rust_decimal::Decimal;
use arrow::{
array::{BooleanArray, Float64Array, Int64Array, StringArray},
record_batch::RecordBatch,
Expand Down Expand Up @@ -149,6 +151,107 @@ fn test_postgres() {
verify_arrow_results(result);
}

#[test]
fn test_csv_infinite_values() {

let _ = env_logger::builder().is_test(true).try_init();

let dburl = env::var("POSTGRES_URL").unwrap();
#[derive(Debug, PartialEq)]
struct Row(i32, NaiveDate, NaiveDateTime, Decimal, DateTime<Utc>);

let url = Url::parse(dburl.as_str()).unwrap();
let (config, _tls) = rewrite_tls_args(&url).unwrap();
let mut source = PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, 1).unwrap();
source.set_queries(&[CXQuery::naked("select * from test_infinite_values")]);
source.fetch_metadata().unwrap();

let mut partitions = source.partition().unwrap();
assert!(partitions.len() == 1);
let mut partition = partitions.remove(0);
partition.result_rows().expect("run query");

assert_eq!(2, partition.nrows());
assert_eq!(5, partition.ncols());

let mut parser = partition.parser().unwrap();

let mut rows: Vec<Row> = Vec::new();
loop {
let (n, is_last) = parser.fetch_next().unwrap();
for _i in 0..n {
rows.push(Row(
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
));
}
if is_last {
break;
}
}
assert_eq!(
vec![
Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX, DateTime::<Utc>::MAX_UTC),
Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN, DateTime::<Utc>::MIN_UTC),
],
rows
);
}

#[test]
fn test_csv_infinite_values_option() {

let _ = env_logger::builder().is_test(true).try_init();

let dburl = env::var("POSTGRES_URL").unwrap();
#[derive(Debug, PartialEq)]
struct Row(i32, Option<NaiveDate>, Option<NaiveDateTime>, Option<Decimal>, Option<DateTime<Utc>>);

let url = Url::parse(dburl.as_str()).unwrap();
let (config, _tls) = rewrite_tls_args(&url).unwrap();
let mut source = PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, 1).unwrap();
source.set_queries(&[CXQuery::naked("select * from test_infinite_values")]);
source.fetch_metadata().unwrap();

let mut partitions = source.partition().unwrap();
assert!(partitions.len() == 1);
let mut partition = partitions.remove(0);
partition.result_rows().expect("run query");

assert_eq!(2, partition.nrows());
assert_eq!(5, partition.ncols());

let mut parser = partition.parser().unwrap();

let mut rows: Vec<Row> = Vec::new();
loop {
let (n, is_last) = parser.fetch_next().unwrap();
for _i in 0..n {
rows.push(Row(
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
));
}
if is_last {
break;
}
}
assert_eq!(
vec![
Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX), Some(DateTime::<Utc>::MAX_UTC)),
Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN), Some(DateTime::<Utc>::MIN_UTC)),
],
rows
);
}


#[test]
fn test_postgres_csv() {
let _ = env_logger::builder().is_test(true).try_init();
Expand Down
12 changes: 12 additions & 0 deletions scripts/postgres.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_str;
DROP TABLE IF EXISTS test_types;
DROP TABLE IF EXISTS test_infinite_values;
DROP TYPE IF EXISTS happiness;
DROP EXTENSION IF EXISTS citext;
DROP EXTENSION IF EXISTS ltree;
Expand All @@ -20,6 +21,17 @@ INSERT INTO test_table VALUES (3, 7, 'b', 3, FALSE);
INSERT INTO test_table VALUES (4, 9, 'c', 7.8, NULL);
INSERT INTO test_table VALUES (1314, 2, NULL, -10, TRUE);

CREATE TABLE IF NOT EXISTS test_infinite_values(
test_int INTEGER NOT NULL,
test_date DATE,
test_timestamp TIMESTAMP,
test_real REAL,
test_timestamp_timezone TIMESTAMP WITH TIME ZONE
);

INSERT INTO test_infinite_values VALUES (1, 'infinity'::DATE, 'infinity'::TIMESTAMP, 'infinity'::REAL, 'infinity'::TIMESTAMP);
INSERT INTO test_infinite_values VALUES (2, '-infinity'::DATE, '-infinity'::TIMESTAMP, '-infinity'::REAL, '-infinity'::TIMESTAMP);

CREATE TABLE IF NOT EXISTS test_str(
id INTEGER NOT NULL,
test_language TEXT,
Expand Down

0 comments on commit 41c7896

Please sign in to comment.