Skip to content

Commit

Permalink
Remove Iterator implementations for Decoders
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Sep 28, 2024
1 parent 6f8c5d6 commit 77f2c96
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 410 deletions.
27 changes: 1 addition & 26 deletions src/array_decoder/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ pub fn new_decimal_decoder(
fixed_scale: u32,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let varint_iter = stripe.stream_map().get(column, Kind::Data);
let varint_iter = Box::new(UnboundedVarintStreamDecoder::new(
varint_iter,
stripe.number_of_rows(),
));
let varint_iter = Box::new(UnboundedVarintStreamDecoder::new(varint_iter));

// Scale is specified on a per varint basis (in addition to being encoded in the type)
let scale_iter = stripe.stream_map().get(column, Kind::Secondary);
Expand Down Expand Up @@ -68,28 +65,6 @@ struct DecimalScaleRepairDecoder {
fixed_scale: u32,
}

impl DecimalScaleRepairDecoder {
#[inline]
fn next_helper(&mut self, varint: Result<i128>, scale: Result<i32>) -> Result<i128> {
Ok(fix_i128_scale(varint?, self.fixed_scale, scale?))
}
}

// TODO: remove this
impl Iterator for DecimalScaleRepairDecoder {
type Item = Result<i128>;

fn next(&mut self) -> Option<Self::Item> {
// TODO: throw error for mismatched stream lengths?
let (varint, scale) = self
.varint_iter
.by_ref()
.zip(self.scale_iter.by_ref())
.next()?;
Some(self.next_helper(varint, scale))
}
}

impl PrimitiveValueDecoder<i128> for DecimalScaleRepairDecoder {
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
// TODO: can probably optimize, reuse buffers?
Expand Down
19 changes: 3 additions & 16 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampInstantArrayDecoder<T
struct TimestampNanosecondAsDecimalWithTzDecoder(TimestampNanosecondAsDecimalDecoder, Tz);

impl TimestampNanosecondAsDecimalWithTzDecoder {
fn next_inner(&self, ts: Result<i128>) -> Result<i128> {
let ts = ts?;
fn next_inner(&self, ts: i128) -> i128 {
let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND);
let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND);

Expand All @@ -300,27 +299,15 @@ impl TimestampNanosecondAsDecimalWithTzDecoder {
.naive_local()
.and_utc();

Ok(
(dt.timestamp() as i128) * NANOSECONDS_IN_SECOND
+ (dt.timestamp_subsec_nanos() as i128),
)
}
}

impl Iterator for TimestampNanosecondAsDecimalWithTzDecoder {
type Item = Result<i128>;

fn next(&mut self) -> Option<Self::Item> {
let ts = self.0.next()?;
Some(self.next_inner(ts))
(dt.timestamp() as i128) * NANOSECONDS_IN_SECOND + (dt.timestamp_subsec_nanos() as i128)
}
}

impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalWithTzDecoder {
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
self.0.decode(out)?;
for x in out.iter_mut() {
*x = self.next_inner(Ok(*x))?;
*x = self.next_inner(*x);
}
Ok(())
}
Expand Down
27 changes: 8 additions & 19 deletions src/array_decoder/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use snafu::ResultExt;

use crate::column::{get_present_vec, Column};
use crate::encoding::byte::ByteRleDecoder;
use crate::encoding::PrimitiveValueDecoder;
use crate::error::ArrowSnafu;
use crate::error::Result;
use crate::proto::stream::Kind;
Expand All @@ -37,7 +38,7 @@ pub struct UnionArrayDecoder {
// TODO: encode this assumption into types
fields: UnionFields,
variants: Vec<Box<dyn ArrayBatchDecoder>>,
tags: Box<dyn Iterator<Item = Result<i8>> + Send>,
tags: Box<dyn PrimitiveValueDecoder<i8> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
}

Expand Down Expand Up @@ -72,30 +73,18 @@ impl ArrayBatchDecoder for UnionArrayDecoder {
parent_present: Option<&[bool]>,
) -> Result<ArrayRef> {
let present = derive_present_vec(&mut self.present, parent_present, batch_size);
let mut tags = vec![0; batch_size];
let tags = match &present {
Some(present) => {
// Since UnionArrays don't have nullability, we rely on child arrays.
// So we default to first child (tag 0) for any nulls from this parent Union.
let mut tags = vec![0; batch_size];
for index in present
.iter()
.enumerate()
.filter_map(|(index, &is_present)| is_present.then_some(index))
{
// TODO: return as error instead
tags[index] = self
.tags
.next()
.transpose()?
.expect("array less than expected length");
}
self.tags.decode_spaced(&mut tags, present)?;
tags
}
None => {
self.tags.decode(&mut tags)?;
tags
}
None => self
.tags
.by_ref()
.take(batch_size)
.collect::<Result<Vec<_>>>()?,
};

// Calculate nullability for children
Expand Down
20 changes: 15 additions & 5 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use bytes::Bytes;
use snafu::ResultExt;

use crate::encoding::boolean::BooleanDecoder;
use crate::encoding::PrimitiveValueDecoder;
use crate::error::{IoSnafu, Result};
use crate::proto::stream::Kind;
use crate::proto::{ColumnEncoding, StripeFooter};
Expand All @@ -41,6 +42,8 @@ impl Column {
name: &str,
data_type: &DataType,
footer: &Arc<StripeFooter>,
// TODO: inaccurate to grab this from stripe; consider list types
// (inner list will have more values/rows than in actual stripe)
number_of_rows: u64,
) -> Self {
Self {
Expand Down Expand Up @@ -159,9 +162,16 @@ impl Column {
///
/// Makes subsequent operations easier to handle.
pub fn get_present_vec(column: &Column, stripe: &Stripe) -> Result<Option<Vec<bool>>> {
stripe
.stream_map()
.get_opt(column, Kind::Present)
.map(|reader| BooleanDecoder::new(reader).collect::<Result<Vec<_>>>())
.transpose()
if let Some(decoder) = stripe.stream_map().get_opt(column, Kind::Present) {
// TODO: this is very inefficient, need to refactor/optimize
let mut decoder = BooleanDecoder::new(decoder);
let mut one = [false];
let mut present = Vec::new();
while decoder.decode(&mut one).is_ok() {
present.push(one[0]);
}
Ok(Some(present))
} else {
Ok(None)
}
}
93 changes: 25 additions & 68 deletions src/encoding/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use arrow::{
};
use bytes::Bytes;

use crate::{
error::{OutOfSpecSnafu, Result},
memory::EstimateMemory,
};
use crate::{error::Result, memory::EstimateMemory};

use super::{
byte::{ByteRleDecoder, ByteRleEncoder},
Expand Down Expand Up @@ -57,48 +54,17 @@ impl<R: Read> BooleanDecoder<R> {
}
}

impl<R: Read> Iterator for BooleanDecoder<R> {
type Item = Result<bool>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
// read more data if necessary
if self.bits_in_data == 0 {
match self.decoder.next() {
Some(Ok(data)) => {
self.data = data as u8;
self.bits_in_data = 8;
Some(Ok(self.value()))
}
Some(Err(err)) => Some(Err(err)),
None => None,
}
} else {
Some(Ok(self.value()))
}
}
}

impl<R: Read> PrimitiveValueDecoder<bool> for BooleanDecoder<R> {
// TODO: can probably implement this better, just copying from iter for now
// TODO: can probably implement this better
fn decode(&mut self, out: &mut [bool]) -> Result<()> {
for x in out.iter_mut() {
// read more data if necessary
if self.bits_in_data == 0 {
match self.decoder.next() {
Some(Ok(data)) => {
self.data = data as u8;
self.bits_in_data = 8;
*x = self.value();
}
Some(Err(err)) => return Err(err),
None => {
return OutOfSpecSnafu {
msg: "Array length less than expected",
}
.fail()
}
}
let mut data = [0];
self.decoder.decode(&mut data)?;
self.data = data[0] as u8;
self.bits_in_data = 8;
*x = self.value();
} else {
*x = self.value();
}
Expand Down Expand Up @@ -169,47 +135,38 @@ mod tests {

#[test]
fn basic() {
let expected = vec![false; 800];
let data = [0x61u8, 0x00];

let data = &mut data.as_ref();

let iter = BooleanDecoder::new(data)
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(iter, vec![false; 800])
let mut decoder = BooleanDecoder::new(data);
let mut actual = vec![true; expected.len()];
decoder.decode(&mut actual).unwrap();
assert_eq!(actual, expected)
}

#[test]
fn literals() {
let expected = vec![
false, true, false, false, false, true, false, false, // 0b01000100
false, true, false, false, false, true, false, true, // 0b01000101
];
let data = [0xfeu8, 0b01000100, 0b01000101];

let data = &mut data.as_ref();

let iter = BooleanDecoder::new(data)
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(
iter,
vec![
false, true, false, false, false, true, false, false, // 0b01000100
false, true, false, false, false, true, false, true, // 0b01000101
]
)
let mut decoder = BooleanDecoder::new(data);
let mut actual = vec![true; expected.len()];
decoder.decode(&mut actual).unwrap();
assert_eq!(actual, expected)
}

#[test]
fn another() {
// "For example, the byte sequence [0xff, 0x80] would be one true followed by seven false values."
let expected = vec![true, false, false, false, false, false, false, false];
let data = [0xff, 0x80];

let data = &mut data.as_ref();

let iter = BooleanDecoder::new(data)
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(
iter,
vec![true, false, false, false, false, false, false, false]
)
let mut decoder = BooleanDecoder::new(data);
let mut actual = vec![true; expected.len()];
decoder.decode(&mut actual).unwrap();
assert_eq!(actual, expected)
}
}
52 changes: 21 additions & 31 deletions src/encoding/byte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,8 @@ impl<R: Read> ByteRleDecoder<R> {
}
}

impl<R: Read> Iterator for ByteRleDecoder<R> {
type Item = Result<i8>;

fn next(&mut self) -> Option<Self::Item> {
if self.index == self.leftovers.len() {
self.read_values().ok()?;
}
let value = self.leftovers[self.index] as i8;
self.index += 1;
Some(Ok(value))
}
}

impl<R: Read> PrimitiveValueDecoder<i8> for ByteRleDecoder<R> {
// TODO: can probably implement this better, just copying from iter for now
// TODO: can probably implement this better
fn decode(&mut self, out: &mut [i8]) -> Result<()> {
for x in out.iter_mut() {
if self.index == self.leftovers.len() {
Expand All @@ -266,28 +253,29 @@ mod tests {

use proptest::prelude::*;

// TODO: have tests varying the out buffer, to ensure decode() is called
// multiple times

fn test_helper(data: &[u8], expected: &[i8]) {
let mut reader = ByteRleDecoder::new(Cursor::new(data));
let mut actual = vec![0; expected.len()];
reader.decode(&mut actual).unwrap();
assert_eq!(actual, expected);
}

#[test]
fn reader_test() {
let data = [0x61u8, 0x00];
let data = &mut data.as_ref();
let iter = ByteRleDecoder::new(data)
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(iter, vec![0; 100]);
let expected = [0; 100];
test_helper(&data, &expected);

let data = [0x01, 0x01];
let data = &mut data.as_ref();
let iter = ByteRleDecoder::new(data)
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(iter, vec![1; 4]);
let expected = [1; 4];
test_helper(&data, &expected);

let data = [0xfe, 0x44, 0x45];
let data = &mut data.as_ref();
let iter = ByteRleDecoder::new(data)
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(iter, vec![0x44, 0x45]);
let expected = [0x44, 0x45];
test_helper(&data, &expected);
}

fn roundtrip_byte_rle_helper(values: &[i8]) -> Result<Vec<i8>> {
Expand All @@ -297,8 +285,10 @@ mod tests {

let buf = writer.take_inner();
let mut cursor = Cursor::new(&buf);
let reader = ByteRleDecoder::new(&mut cursor);
reader.into_iter().collect::<Result<Vec<_>>>()
let mut reader = ByteRleDecoder::new(&mut cursor);
let mut actual = vec![0; values.len()];
reader.decode(&mut actual)?;
Ok(actual)
}

#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit 77f2c96

Please sign in to comment.