Skip to content

Commit

Permalink
microsecond support for arrow/arrow2 and postgres/mysql/oracle/mssql
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaoying committed Jun 15, 2024
1 parent c5c7dec commit f7ec750
Show file tree
Hide file tree
Showing 11 changed files with 432 additions and 50 deletions.
96 changes: 95 additions & 1 deletion connectorx-python/connectorx/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,101 @@ def test_arrow2(postgres_url: str) -> None:
df.sort_values(by="test_int", inplace=True, ignore_index=True)
assert_frame_equal(df, expected, check_names=True)


def test_arrow_type(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_ltree, test_name FROM test_types"
df = read_sql(postgres_url, query, return_type="arrow")
df = df.to_pandas(date_as_object=False)
df.sort_values(by="test_int16", inplace=True, ignore_index=True)
expected = pd.DataFrame(
index=range(4),
data={
"test_date": pd.Series(
["1970-01-01", "2000-02-28", "2038-01-18", None], dtype="datetime64[ms]"
),
"test_timestamp": pd.Series(
[
"1970-01-01 00:00:01",
"2000-02-28 12:00:10",
"2038-01-18 23:59:59",
None,
],
dtype="datetime64[us]",
),
"test_timestamptz": pd.Series(
[
"1970-01-01 00:00:01+00:00",
"2000-02-28 16:00:10+00:00",
"2038-01-18 15:59:59+00:00",
None,
],
dtype="datetime64[us, UTC]",
),
"test_int16": pd.Series([0, 1, 2, 3], dtype="int64"),
"test_int64": pd.Series(
[-9223372036854775808, 0, 9223372036854775807, None], dtype="float64"
),
"test_float32": pd.Series(
[None, 3.1415926535, 2.71, -1e-37], dtype="float64"
),
"test_numeric": pd.Series([None, 521.34, 0.00, 0.00], dtype="float64"),
"test_bpchar": pd.Series(["a ", "bb ", "ccc ", None], dtype="object"),
"test_char": pd.Series(["a", "b", None, "d"], dtype="object"),
"test_varchar": pd.Series([None, "bb", "c", "defghijklm"], dtype="object"),
"test_uuid": pd.Series(
[
"86b494cc-96b2-11eb-9298-3e22fbb9fe9d",
"86b49b84-96b2-11eb-9298-3e22fbb9fe9d",
"86b49c42-96b2-11eb-9298-3e22fbb9fe9d",
None,
],
dtype="object",
),
"test_time": pd.Series(
[
datetime.time(8, 12, 40),
None,
datetime.time(23, 0, 10),
datetime.time(18, 30),
],
dtype="object",
),
"test_bytea": pd.Series(
[
None,
b"\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5",
b"",
b"\xf0\x9f\x98\x9c",
],
dtype="object",
),
"test_json": pd.Series(
[
'{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
'{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
'{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
None,
],
dtype="object",
),
"test_jsonb": pd.Series(
[
'{"product":"Beer","qty":6}',
'{"product":"Diaper","qty":24}',
'{"product":"Toy Car","qty":1}',
None,
],
dtype="object",
),
"test_ltree": pd.Series(
["A.B.C.D", "A.B.E", "A", None], dtype="object"
),
"test_name": pd.Series(
["0", "21", "someName", "101203203-1212323-22131235"]
)

},
)
assert_frame_equal(df, expected, check_names=True)
def test_arrow2_type(postgres_url: str) -> None:
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_enum, test_ltree, test_name FROM test_types"
df = read_sql(postgres_url, query, return_type="arrow2")
Expand Down
162 changes: 151 additions & 11 deletions connectorx/src/destinations/arrow/arrow_assoc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use super::errors::{ArrowDestinationError, Result};
use super::{
errors::{ArrowDestinationError, Result},
typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
};
use crate::constants::SECONDS_IN_DAY;
use arrow::array::{
ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder,
Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder,
TimestampNanosecondBuilder, UInt32Builder, UInt64Builder,
ArrayBuilder, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder, Int32Builder,
Int64Builder, LargeBinaryBuilder, StringBuilder, Time64MicrosecondBuilder,
Time64NanosecondBuilder, TimestampMicrosecondBuilder, TimestampNanosecondBuilder,
UInt32Builder, UInt64Builder,
};
use arrow::datatypes::Field;
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
Expand Down Expand Up @@ -188,6 +192,48 @@ impl ArrowAssoc for Option<DateTime<Utc>> {
}
}

impl ArrowAssoc for DateTimeWrapperMicro {
type Builder = TimestampMicrosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("UTC")
}

#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: DateTimeWrapperMicro) {
builder.append_value(value.0.timestamp_micros());
}

fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
false,
)
}
}

impl ArrowAssoc for Option<DateTimeWrapperMicro> {
type Builder = TimestampMicrosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("UTC")
}

#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: Option<DateTimeWrapperMicro>) {
builder.append_option(value.map(|x| x.0.timestamp_micros()));
}

fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
true,
)
}
}

fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
match nd.and_hms_opt(0, 0, 0) {
Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
Expand All @@ -196,7 +242,9 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
}

fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 {
nd.and_utc().timestamp_millis()
nd.and_utc()
.timestamp_nanos_opt()
.unwrap_or_else(|| panic!("out of range DateTime"))
}

impl ArrowAssoc for Option<NaiveDate> {
Expand Down Expand Up @@ -234,10 +282,10 @@ impl ArrowAssoc for NaiveDate {
}

impl ArrowAssoc for Option<NaiveDateTime> {
type Builder = Date64Builder;
type Builder = TimestampNanosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
Date64Builder::with_capacity(nrows)
TimestampNanosecondBuilder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
Expand All @@ -246,15 +294,19 @@ impl ArrowAssoc for Option<NaiveDateTime> {
}

fn field(header: &str) -> Field {
Field::new(header, ArrowDataType::Date64, true)
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
true,
)
}
}

impl ArrowAssoc for NaiveDateTime {
type Builder = Date64Builder;
type Builder = TimestampNanosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
Date64Builder::with_capacity(nrows)
TimestampNanosecondBuilder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
Expand All @@ -263,7 +315,56 @@ impl ArrowAssoc for NaiveDateTime {
}

fn field(header: &str) -> Field {
Field::new(header, ArrowDataType::Date64, false)
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
false,
)
}
}

impl ArrowAssoc for Option<NaiveDateTimeWrapperMicro> {
type Builder = TimestampMicrosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
TimestampMicrosecondBuilder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: Option<NaiveDateTimeWrapperMicro>) -> Result<()> {
builder.append_option(match value {
Some(v) => Some(v.0.and_utc().timestamp_micros()),
None => None,
});
Ok(())
}

fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
true,
)
}
}

impl ArrowAssoc for NaiveDateTimeWrapperMicro {
type Builder = TimestampMicrosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
TimestampMicrosecondBuilder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) -> Result<()> {
builder.append_value(value.0.and_utc().timestamp_micros());
Ok(())
}

fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
false,
)
}
}

Expand Down Expand Up @@ -307,6 +408,45 @@ impl ArrowAssoc for NaiveTime {
}
}

impl ArrowAssoc for Option<NaiveTimeWrapperMicro> {
type Builder = Time64MicrosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
Time64MicrosecondBuilder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: Option<NaiveTimeWrapperMicro>) -> Result<()> {
builder.append_option(value.map(|t| {
t.0.num_seconds_from_midnight() as i64 * 1_000_000 + (t.0.nanosecond() as i64) / 1000
}));
Ok(())
}

fn field(header: &str) -> Field {
Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true)
}
}

impl ArrowAssoc for NaiveTimeWrapperMicro {
type Builder = Time64MicrosecondBuilder;

fn builder(nrows: usize) -> Self::Builder {
Time64MicrosecondBuilder::with_capacity(nrows)
}

fn append(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) -> Result<()> {
builder.append_value(
value.0.num_seconds_from_midnight() as i64 * 1_000_000
+ (value.0.nanosecond() as i64) / 1000,
);
Ok(())
}

fn field(header: &str) -> Field {
Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false)
}
}

impl ArrowAssoc for Option<Vec<u8>> {
type Builder = LargeBinaryBuilder;

Expand Down
15 changes: 15 additions & 0 deletions connectorx/src/destinations/arrow/typesystem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use crate::impl_typesystem;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};

#[derive(Debug, Clone, Copy)]
pub struct DateTimeWrapperMicro(pub DateTime<Utc>);

#[derive(Debug, Clone, Copy)]
pub struct NaiveTimeWrapperMicro(pub NaiveTime);

#[derive(Debug, Clone, Copy)]
pub struct NaiveDateTimeWrapperMicro(pub NaiveDateTime);

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ArrowTypeSystem {
Int32(bool),
Expand All @@ -14,8 +23,11 @@ pub enum ArrowTypeSystem {
LargeBinary(bool),
Date32(bool),
Date64(bool),
Date64Micro(bool),
Time64(bool),
Time64Micro(bool),
DateTimeTz(bool),
DateTimeTzMicro(bool),
}

impl_typesystem! {
Expand All @@ -32,7 +44,10 @@ impl_typesystem! {
{ LargeBinary => Vec<u8> }
{ Date32 => NaiveDate }
{ Date64 => NaiveDateTime }
{ Date64Micro => NaiveDateTimeWrapperMicro }
{ Time64 => NaiveTime }
{ Time64Micro => NaiveTimeWrapperMicro }
{ DateTimeTz => DateTime<Utc> }
{ DateTimeTzMicro => DateTimeWrapperMicro }
}
}
12 changes: 10 additions & 2 deletions connectorx/src/sources/oracle/typesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum OracleTypeSystem {
Date(bool),
Timestamp(bool),
TimestampTz(bool),
TimestampNano(bool),
TimestampTzNano(bool),
}

impl_typesystem! {
Expand All @@ -26,8 +28,8 @@ impl_typesystem! {
{ Float | NumFloat | BinaryFloat | BinaryDouble => f64 }
{ Blob => Vec<u8>}
{ Clob | VarChar | Char | NVarChar | NChar => String }
{ Date | Timestamp => NaiveDateTime }
{ TimestampTz => DateTime<Utc> }
{ Date | Timestamp | TimestampNano => NaiveDateTime }
{ TimestampTz | TimestampTzNano => DateTime<Utc> }
}
}

Expand All @@ -48,7 +50,13 @@ impl<'a> From<&'a OracleType> for OracleTypeSystem {
OracleType::Varchar2(_) => VarChar(true),
OracleType::NVarchar2(_) => NVarChar(true),
OracleType::Date => Date(true),
OracleType::Timestamp(7) | OracleType::Timestamp(8) | OracleType::Timestamp(9) => {
TimestampNano(true)
}
OracleType::Timestamp(_) => Timestamp(true),
OracleType::TimestampTZ(7)
| OracleType::TimestampTZ(8)
| OracleType::TimestampTZ(9) => TimestampTzNano(true),
OracleType::TimestampTZ(_) => TimestampTz(true),
_ => unimplemented!("{}", format!("Type {:?} not implemented for oracle!", ty)),
}
Expand Down
Loading

0 comments on commit f7ec750

Please sign in to comment.