Skip to content

Commit

Permalink
Fixed Option types for csv parser
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewJackson2020 committed Feb 14, 2024
1 parent 17659bf commit 371fd27
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
6 changes: 6 additions & 0 deletions connectorx/src/sources/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,8 @@ impl<'r, 'a> Produce<'r, Option<Decimal>> for PostgresCSVSourceParser<'a> {
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"" => None,
"Infinity" => Some(Decimal::MAX),
"-Infinity" => Some(Decimal::MIN),
v => Some(v.parse().map_err(|_| {
ConnectorXError::cannot_produce::<Decimal>(Some(v.into()))
})?),
Expand Down Expand Up @@ -844,6 +846,8 @@ impl<'r, 'a> Produce<'r, Option<NaiveDate>> for PostgresCSVSourceParser<'a> {
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"" => None,
"infinity" => Some(NaiveDate::MAX),
"-infinity" => Some(NaiveDate::MIN),
v => Some(
NaiveDate::parse_from_str(v, "%Y-%m-%d")
.map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
Expand Down Expand Up @@ -880,6 +884,8 @@ impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for PostgresCSVSourceParser<'a>
let (ridx, cidx) = self.next_loc()?;
match &self.rowbuf[ridx][cidx][..] {
"" => None,
"infinity" => Some(NaiveDateTime::MAX),
"-infinity" => Some(NaiveDateTime::MIN),
v => Some(
NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S").map_err(|_| {
ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into()))
Expand Down
50 changes: 50 additions & 0 deletions connectorx/tests/test_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,56 @@ fn test_csv_infinite_values() {
);
}

#[test]
fn test_csv_infinite_values_option() {

let _ = env_logger::builder().is_test(true).try_init();

let dburl = env::var("POSTGRES_URL").unwrap();
#[derive(Debug, PartialEq)]
struct Row(i32, Option<NaiveDate>, Option<NaiveDateTime>, Option<Decimal>);

let url = Url::parse(dburl.as_str()).unwrap();
let (config, _tls) = rewrite_tls_args(&url).unwrap();
let mut source = PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, 1).unwrap();
source.set_queries(&[CXQuery::naked("select * from test_infinite_values")]);
source.fetch_metadata().unwrap();

let mut partitions = source.partition().unwrap();
assert!(partitions.len() == 1);
let mut partition = partitions.remove(0);
partition.result_rows().expect("run query");

assert_eq!(2, partition.nrows());
assert_eq!(4, partition.ncols());

let mut parser = partition.parser().unwrap();

let mut rows: Vec<Row> = Vec::new();
loop {
let (n, is_last) = parser.fetch_next().unwrap();
for _i in 0..n {
rows.push(Row(
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
parser.produce().unwrap(),
));
}
if is_last {
break;
}
}
assert_eq!(
vec![
Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX)),
Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN)),
],
rows
);
}


#[test]
fn test_postgres_csv() {
let _ = env_logger::builder().is_test(true).try_init();
Expand Down

0 comments on commit 371fd27

Please sign in to comment.