Skip to content

Commit

Permalink
Fixed infinite bug for Utc CSV
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewJackson2020 committed Feb 14, 2024
1 parent 371fd27 commit 2a9db84
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
15 changes: 10 additions & 5 deletions connectorx/src/sources/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,11 +796,14 @@ impl<'r, 'a> Produce<'r, DateTime<Utc>> for PostgresCSVSourceParser<'a> {
#[throws(PostgresSourceError)]
fn produce(&mut self) -> DateTime<Utc> {
let (ridx, cidx) = self.next_loc()?;
let s: &str = &self.rowbuf[ridx][cidx][..];
// postgres csv return example: 1970-01-01 00:00:01+00
format!("{}:00", s).parse().map_err(|_| {
ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(self.rowbuf[ridx][cidx].into()))
})?
match &self.rowbuf[ridx][cidx][..] {
"infinity" => DateTime::<Utc>::MAX_UTC,
"-infinity" => DateTime::<Utc>::MIN_UTC,
// postgres csv return example: 1970-01-01 00:00:01+00
v => format!("{}:00", v).parse().map_err(|_| {
ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into()))
})?
}
}
}

Expand All @@ -812,6 +815,8 @@ impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for PostgresCSVSourceParser<'a>
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"" => None,
"infinity" => Some(DateTime::<Utc>::MAX_UTC),
"-infinity" => Some(DateTime::<Utc>::MIN_UTC),
v => {
// postgres csv return example: 1970-01-01 00:00:01+00
Some(format!("{}:00", v).parse().map_err(|_| {
Expand Down
20 changes: 11 additions & 9 deletions connectorx/tests/test_postgres.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{NaiveDate, NaiveDateTime};
use chrono::{NaiveDate, NaiveDateTime, DateTime, Utc};
use rust_decimal::Decimal;
use arrow::{
array::{BooleanArray, Float64Array, Int64Array, StringArray},
Expand Down Expand Up @@ -158,7 +158,7 @@ fn test_csv_infinite_values() {

let dburl = env::var("POSTGRES_URL").unwrap();
#[derive(Debug, PartialEq)]
struct Row(i32, NaiveDate, NaiveDateTime, Decimal);
struct Row(i32, NaiveDate, NaiveDateTime, Decimal, DateTime<Utc>);

let url = Url::parse(dburl.as_str()).unwrap();
let (config, _tls) = rewrite_tls_args(&url).unwrap();
Expand All @@ -172,7 +172,7 @@ fn test_csv_infinite_values() {
partition.result_rows().expect("run query");

assert_eq!(2, partition.nrows());
assert_eq!(4, partition.ncols());
assert_eq!(5, partition.ncols());

let mut parser = partition.parser().unwrap();

Expand All @@ -185,6 +185,7 @@ fn test_csv_infinite_values() {
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
));
}
if is_last {
Expand All @@ -193,8 +194,8 @@ fn test_csv_infinite_values() {
}
assert_eq!(
vec![
Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX),
Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN),
Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX, DateTime::<Utc>::MAX_UTC),
Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN, DateTime::<Utc>::MIN_UTC),
],
rows
);
Expand All @@ -207,7 +208,7 @@ fn test_csv_infinite_values_option() {

let dburl = env::var("POSTGRES_URL").unwrap();
#[derive(Debug, PartialEq)]
struct Row(i32, Option<NaiveDate>, Option<NaiveDateTime>, Option<Decimal>);
struct Row(i32, Option<NaiveDate>, Option<NaiveDateTime>, Option<Decimal>, Option<DateTime<Utc>>);

let url = Url::parse(dburl.as_str()).unwrap();
let (config, _tls) = rewrite_tls_args(&url).unwrap();
Expand All @@ -221,7 +222,7 @@ fn test_csv_infinite_values_option() {
partition.result_rows().expect("run query");

assert_eq!(2, partition.nrows());
assert_eq!(4, partition.ncols());
assert_eq!(5, partition.ncols());

let mut parser = partition.parser().unwrap();

Expand All @@ -234,6 +235,7 @@ fn test_csv_infinite_values_option() {
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
));
}
if is_last {
Expand All @@ -242,8 +244,8 @@ fn test_csv_infinite_values_option() {
}
assert_eq!(
vec![
Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX)),
Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN)),
Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX), Some(DateTime::<Utc>::MAX_UTC)),
Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN), Some(DateTime::<Utc>::MIN_UTC)),
],
rows
);
Expand Down
7 changes: 4 additions & 3 deletions scripts/postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ CREATE TABLE IF NOT EXISTS test_infinite_values(
test_int INTEGER NOT NULL,
test_date DATE,
test_timestamp TIMESTAMP,
test_real REAL
test_real REAL,
test_timestamp_timezone TIMESTAMP WITH TIME ZONE
);

INSERT INTO test_infinite_values VALUES (1, 'infinity'::DATE, 'infinity'::TIMESTAMP, 'infinity'::REAL);
INSERT INTO test_infinite_values VALUES (2, '-infinity'::DATE, '-infinity'::TIMESTAMP, '-infinity'::REAL);
INSERT INTO test_infinite_values VALUES (1, 'infinity'::DATE, 'infinity'::TIMESTAMP, 'infinity'::REAL, 'infinity'::TIMESTAMP);
INSERT INTO test_infinite_values VALUES (2, '-infinity'::DATE, '-infinity'::TIMESTAMP, '-infinity'::REAL, '-infinity'::TIMESTAMP);

CREATE TABLE IF NOT EXISTS test_str(
id INTEGER NOT NULL,
Expand Down

0 comments on commit 2a9db84

Please sign in to comment.