Skip to content

Commit

Permalink
Simplify retrieval of timestamp decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Sep 22, 2024
1 parent cfb955b commit 6e9f957
Showing 1 changed file with 29 additions and 45 deletions.
74 changes: 29 additions & 45 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ const NANOSECOND_DIGITS: i8 = 9;
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;

fn get_timestamp_decoder<T: ArrowTimestampType + Send>(
fn get_inner_timestamp_decoder<T: ArrowTimestampType + Send>(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
is_instant: bool,
) -> Result<Box<dyn ArrayBatchDecoder>> {
) -> Result<PrimitiveArrayDecoder<T>> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

Expand All @@ -64,17 +63,31 @@ fn get_timestamp_decoder<T: ArrowTimestampType + Send>(
data,
secondary,
));
let inner = PrimitiveArrayDecoder::<T>::new(iter, present);
if is_instant {
Ok(Box::new(TimestampInstantArrayDecoder(inner)))
} else {
match stripe.writer_tz() {
Some(writer_tz) => Ok(Box::new(TimestampOffsetArrayDecoder { inner, writer_tz })),
None => Ok(Box::new(inner)),
}
Ok(PrimitiveArrayDecoder::<T>::new(iter, present))
}

fn get_timestamp_decoder<T: ArrowTimestampType + Send>(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let inner = get_inner_timestamp_decoder::<T>(column, stripe, seconds_since_unix_epoch)?;
match stripe.writer_tz() {
Some(writer_tz) => Ok(Box::new(TimestampOffsetArrayDecoder { inner, writer_tz })),
None => Ok(Box::new(inner)),
}
}

fn get_timestamp_instant_decoder<T: ArrowTimestampType + Send>(
column: &Column,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
let inner =
get_inner_timestamp_decoder::<T>(column, stripe, ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH)?;
Ok(Box::new(TimestampInstantArrayDecoder(inner)))
}

fn decimal128_decoder(
column: &Column,
stripe: &Stripe,
Expand Down Expand Up @@ -151,35 +164,27 @@ pub fn new_timestamp_decoder(

match field_type {
ArrowDataType::Timestamp(TimeUnit::Second, None) => {
get_timestamp_decoder::<TimestampSecondType>(
column,
stripe,
seconds_since_unix_epoch,
false,
)
get_timestamp_decoder::<TimestampSecondType>(column, stripe, seconds_since_unix_epoch)
}
ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => {
get_timestamp_decoder::<TimestampMillisecondType>(
column,
stripe,
seconds_since_unix_epoch,
false,
)
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
get_timestamp_decoder::<TimestampMicrosecondType>(
column,
stripe,
seconds_since_unix_epoch,
false,
)
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) => {
get_timestamp_decoder::<TimestampNanosecondType>(
column,
stripe,
seconds_since_unix_epoch,
false,
)
}
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Expand All @@ -206,38 +211,17 @@ pub fn new_timestamp_instant_decoder(
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
match field_type {
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ArrowDataType::Timestamp(TimeUnit::Second, Some(tz)) if tz.as_ref() == "UTC" => {
get_timestamp_decoder::<TimestampSecondType>(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
true,
)
get_timestamp_instant_decoder::<TimestampSecondType>(column, stripe)
}
ArrowDataType::Timestamp(TimeUnit::Millisecond, Some(tz)) if tz.as_ref() == "UTC" => {
get_timestamp_decoder::<TimestampMillisecondType>(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
true,
)
get_timestamp_instant_decoder::<TimestampMillisecondType>(column, stripe)
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC" => {
get_timestamp_decoder::<TimestampMicrosecondType>(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
true,
)
get_timestamp_instant_decoder::<TimestampMicrosecondType>(column, stripe)
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) if tz.as_ref() == "UTC" => {
get_timestamp_decoder::<TimestampNanosecondType>(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
true,
)
get_timestamp_instant_decoder::<TimestampNanosecondType>(column, stripe)
}
ArrowDataType::Timestamp(_, Some(_)) => UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps",
Expand Down

0 comments on commit 6e9f957

Please sign in to comment.