diff --git a/src/array_decoder/decimal.rs b/src/array_decoder/decimal.rs index a9bf7ec3..b9cad5bb 100644 --- a/src/array_decoder/decimal.rs +++ b/src/array_decoder/decimal.rs @@ -33,10 +33,7 @@ pub fn new_decimal_decoder( fixed_scale: u32, ) -> Result> { let varint_iter = stripe.stream_map().get(column, Kind::Data); - let varint_iter = Box::new(UnboundedVarintStreamDecoder::new( - varint_iter, - stripe.number_of_rows(), - )); + let varint_iter = Box::new(UnboundedVarintStreamDecoder::new(varint_iter)); // Scale is specified on a per varint basis (in addition to being encoded in the type) let scale_iter = stripe.stream_map().get(column, Kind::Secondary); @@ -68,28 +65,6 @@ struct DecimalScaleRepairDecoder { fixed_scale: u32, } -impl DecimalScaleRepairDecoder { - #[inline] - fn next_helper(&mut self, varint: Result, scale: Result) -> Result { - Ok(fix_i128_scale(varint?, self.fixed_scale, scale?)) - } -} - -// TODO: remove this -impl Iterator for DecimalScaleRepairDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - // TODO: throw error for mismatched stream lengths? - let (varint, scale) = self - .varint_iter - .by_ref() - .zip(self.scale_iter.by_ref()) - .next()?; - Some(self.next_helper(varint, scale)) - } -} - impl PrimitiveValueDecoder for DecimalScaleRepairDecoder { fn decode(&mut self, out: &mut [i128]) -> Result<()> { // TODO: can probably optimize, reuse buffers? diff --git a/src/array_decoder/timestamp.rs b/src/array_decoder/timestamp.rs index 88a369a4..940b63e0 100644 --- a/src/array_decoder/timestamp.rs +++ b/src/array_decoder/timestamp.rs @@ -287,8 +287,7 @@ impl ArrayBatchDecoder for TimestampInstantArrayDecoder) -> Result { - let ts = ts?; + fn next_inner(&self, ts: i128) -> i128 { let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND); let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND); @@ -300,19 +299,7 @@ impl TimestampNanosecondAsDecimalWithTzDecoder { .naive_local() .and_utc(); - Ok( - (dt.timestamp() as i128) * NANOSECONDS_IN_SECOND - + (dt.timestamp_subsec_nanos() as i128), - ) - } -} - -impl Iterator for TimestampNanosecondAsDecimalWithTzDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - let ts = self.0.next()?; - Some(self.next_inner(ts)) + (dt.timestamp() as i128) * NANOSECONDS_IN_SECOND + (dt.timestamp_subsec_nanos() as i128) } } @@ -320,7 +307,7 @@ impl PrimitiveValueDecoder for TimestampNanosecondAsDecimalWithTzDecoder { fn decode(&mut self, out: &mut [i128]) -> Result<()> { self.0.decode(out)?; for x in out.iter_mut() { - *x = self.next_inner(Ok(*x))?; + *x = self.next_inner(*x); } Ok(()) } diff --git a/src/array_decoder/union.rs b/src/array_decoder/union.rs index b4a2fe91..cb347664 100644 --- a/src/array_decoder/union.rs +++ b/src/array_decoder/union.rs @@ -24,6 +24,7 @@ use snafu::ResultExt; use crate::column::{get_present_vec, Column}; use crate::encoding::byte::ByteRleDecoder; +use crate::encoding::PrimitiveValueDecoder; use crate::error::ArrowSnafu; use crate::error::Result; use crate::proto::stream::Kind; @@ -37,7 +38,7 @@ pub struct UnionArrayDecoder { // TODO: encode this assumption into types fields: UnionFields, variants: Vec>, - tags: Box> + Send>, + tags: Box + Send>, present: Option + Send>>, } @@ -72,30 +73,18 @@ impl ArrayBatchDecoder for UnionArrayDecoder { parent_present: Option<&[bool]>, ) -> Result { let present = derive_present_vec(&mut self.present, parent_present, batch_size); + let mut tags = vec![0; batch_size]; let tags = match &present { Some(present) => { // Since UnionArrays don't have nullability, we rely on child arrays. // So we default to first child (tag 0) for any nulls from this parent Union. - let mut tags = vec![0; batch_size]; - for index in present - .iter() - .enumerate() - .filter_map(|(index, &is_present)| is_present.then_some(index)) - { - // TODO: return as error instead - tags[index] = self - .tags - .next() - .transpose()? - .expect("array less than expected length"); - } + self.tags.decode_spaced(&mut tags, present)?; + tags + } + None => { + self.tags.decode(&mut tags)?; tags } - None => self - .tags - .by_ref() - .take(batch_size) - .collect::>>()?, }; // Calculate nullability for children diff --git a/src/column.rs b/src/column.rs index fb61f504..f9078baa 100644 --- a/src/column.rs +++ b/src/column.rs @@ -21,6 +21,7 @@ use bytes::Bytes; use snafu::ResultExt; use crate::encoding::boolean::BooleanDecoder; +use crate::encoding::PrimitiveValueDecoder; use crate::error::{IoSnafu, Result}; use crate::proto::stream::Kind; use crate::proto::{ColumnEncoding, StripeFooter}; @@ -41,6 +42,8 @@ impl Column { name: &str, data_type: &DataType, footer: &Arc, + // TODO: inaccurate to grab this from stripe; consider list types + // (inner list will have more values/rows than in actual stripe) number_of_rows: u64, ) -> Self { Self { @@ -159,9 +162,16 @@ impl Column { /// /// Makes subsequent operations easier to handle. pub fn get_present_vec(column: &Column, stripe: &Stripe) -> Result>> { - stripe - .stream_map() - .get_opt(column, Kind::Present) - .map(|reader| BooleanDecoder::new(reader).collect::>>()) - .transpose() + if let Some(decoder) = stripe.stream_map().get_opt(column, Kind::Present) { + // TODO: this is very inefficient, need to refactor/optimize + let mut decoder = BooleanDecoder::new(decoder); + let mut one = [false]; + let mut present = Vec::new(); + while decoder.decode(&mut one).is_ok() { + present.push(one[0]); + } + Ok(Some(present)) + } else { + Ok(None) + } } diff --git a/src/encoding/boolean.rs b/src/encoding/boolean.rs index 0a3f1b33..8c39df2e 100644 --- a/src/encoding/boolean.rs +++ b/src/encoding/boolean.rs @@ -23,10 +23,7 @@ use arrow::{ }; use bytes::Bytes; -use crate::{ - error::{OutOfSpecSnafu, Result}, - memory::EstimateMemory, -}; +use crate::{error::Result, memory::EstimateMemory}; use super::{ byte::{ByteRleDecoder, ByteRleEncoder}, @@ -57,48 +54,17 @@ impl BooleanDecoder { } } -impl Iterator for BooleanDecoder { - type Item = Result; - - #[inline] - fn next(&mut self) -> Option { - // read more data if necessary - if self.bits_in_data == 0 { - match self.decoder.next() { - Some(Ok(data)) => { - self.data = data as u8; - self.bits_in_data = 8; - Some(Ok(self.value())) - } - Some(Err(err)) => Some(Err(err)), - None => None, - } - } else { - Some(Ok(self.value())) - } - } -} - impl PrimitiveValueDecoder for BooleanDecoder { - // TODO: can probably implement this better, just copying from iter for now + // TODO: can probably implement this better fn decode(&mut self, out: &mut [bool]) -> Result<()> { for x in out.iter_mut() { // read more data if necessary if self.bits_in_data == 0 { - match self.decoder.next() { - Some(Ok(data)) => { - self.data = data as u8; - self.bits_in_data = 8; - *x = self.value(); - } - Some(Err(err)) => return Err(err), - None => { - return OutOfSpecSnafu { - msg: "Array length less than expected", - } - .fail() - } - } + let mut data = [0]; + self.decoder.decode(&mut data)?; + self.data = data[0] as u8; + self.bits_in_data = 8; + *x = self.value(); } else { *x = self.value(); } @@ -169,47 +135,38 @@ mod tests { #[test] fn basic() { + let expected = vec![false; 800]; let data = [0x61u8, 0x00]; - let data = &mut data.as_ref(); - - let iter = BooleanDecoder::new(data) - .collect::>>() - .unwrap(); - assert_eq!(iter, vec![false; 800]) + let mut decoder = BooleanDecoder::new(data); + let mut actual = vec![true; expected.len()]; + decoder.decode(&mut actual).unwrap(); + assert_eq!(actual, expected) } #[test] fn literals() { + let expected = vec![ + false, true, false, false, false, true, false, false, // 0b01000100 + false, true, false, false, false, true, false, true, // 0b01000101 + ]; let data = [0xfeu8, 0b01000100, 0b01000101]; - let data = &mut data.as_ref(); - - let iter = BooleanDecoder::new(data) - .collect::>>() - .unwrap(); - assert_eq!( - iter, - vec![ - false, true, false, false, false, true, false, false, // 0b01000100 - false, true, false, false, false, true, false, true, // 0b01000101 - ] - ) + let mut decoder = BooleanDecoder::new(data); + let mut actual = vec![true; expected.len()]; + decoder.decode(&mut actual).unwrap(); + assert_eq!(actual, expected) } #[test] fn another() { // "For example, the byte sequence [0xff, 0x80] would be one true followed by seven false values." + let expected = vec![true, false, false, false, false, false, false, false]; let data = [0xff, 0x80]; - let data = &mut data.as_ref(); - - let iter = BooleanDecoder::new(data) - .collect::>>() - .unwrap(); - assert_eq!( - iter, - vec![true, false, false, false, false, false, false, false] - ) + let mut decoder = BooleanDecoder::new(data); + let mut actual = vec![true; expected.len()]; + decoder.decode(&mut actual).unwrap(); + assert_eq!(actual, expected) } } diff --git a/src/encoding/byte.rs b/src/encoding/byte.rs index 86188417..6b1c4d57 100644 --- a/src/encoding/byte.rs +++ b/src/encoding/byte.rs @@ -231,21 +231,8 @@ impl ByteRleDecoder { } } -impl Iterator for ByteRleDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - if self.index == self.leftovers.len() { - self.read_values().ok()?; - } - let value = self.leftovers[self.index] as i8; - self.index += 1; - Some(Ok(value)) - } -} - impl PrimitiveValueDecoder for ByteRleDecoder { - // TODO: can probably implement this better, just copying from iter for now + // TODO: can probably implement this better fn decode(&mut self, out: &mut [i8]) -> Result<()> { for x in out.iter_mut() { if self.index == self.leftovers.len() { @@ -266,28 +253,29 @@ mod tests { use proptest::prelude::*; + // TODO: have tests varying the out buffer, to ensure decode() is called + // multiple times + + fn test_helper(data: &[u8], expected: &[i8]) { + let mut reader = ByteRleDecoder::new(Cursor::new(data)); + let mut actual = vec![0; expected.len()]; + reader.decode(&mut actual).unwrap(); + assert_eq!(actual, expected); + } + #[test] fn reader_test() { let data = [0x61u8, 0x00]; - let data = &mut data.as_ref(); - let iter = ByteRleDecoder::new(data) - .collect::>>() - .unwrap(); - assert_eq!(iter, vec![0; 100]); + let expected = [0; 100]; + test_helper(&data, &expected); let data = [0x01, 0x01]; - let data = &mut data.as_ref(); - let iter = ByteRleDecoder::new(data) - .collect::>>() - .unwrap(); - assert_eq!(iter, vec![1; 4]); + let expected = [1; 4]; + test_helper(&data, &expected); let data = [0xfe, 0x44, 0x45]; - let data = &mut data.as_ref(); - let iter = ByteRleDecoder::new(data) - .collect::>>() - .unwrap(); - assert_eq!(iter, vec![0x44, 0x45]); + let expected = [0x44, 0x45]; + test_helper(&data, &expected); } fn roundtrip_byte_rle_helper(values: &[i8]) -> Result> { @@ -297,8 +285,10 @@ mod tests { let buf = writer.take_inner(); let mut cursor = Cursor::new(&buf); - let reader = ByteRleDecoder::new(&mut cursor); - reader.into_iter().collect::>>() + let mut reader = ByteRleDecoder::new(&mut cursor); + let mut actual = vec![0; values.len()]; + reader.decode(&mut actual)?; + Ok(actual) } #[derive(Debug, Clone)] diff --git a/src/encoding/decimal.rs b/src/encoding/decimal.rs index 03e673df..dab6048f 100644 --- a/src/encoding/decimal.rs +++ b/src/encoding/decimal.rs @@ -24,26 +24,11 @@ use super::{util::read_varint_zigzagged, PrimitiveValueDecoder, SignedEncoding}; /// Read stream of zigzag encoded varints as i128 (unbound). pub struct UnboundedVarintStreamDecoder { reader: R, - remaining: usize, } impl UnboundedVarintStreamDecoder { - pub fn new(reader: R, expected_length: usize) -> Self { - Self { - reader, - remaining: expected_length, - } - } -} - -impl Iterator for UnboundedVarintStreamDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - (self.remaining > 0).then(|| { - self.remaining -= 1; - read_varint_zigzagged::(&mut self.reader) - }) + pub fn new(reader: R) -> Self { + Self { reader } } } diff --git a/src/encoding/float.rs b/src/encoding/float.rs index 23dcc46c..95e20ae8 100644 --- a/src/encoding/float.rs +++ b/src/encoding/float.rs @@ -83,15 +83,6 @@ impl PrimitiveValueDecoder for FloatDecoder } } -// TODO: remove this, currently only needed as we move from iterator to PrimitiveValueDecoder -impl Iterator for FloatDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - unimplemented!() - } -} - /// No special run encoding for floats/doubles, they are stored as their IEEE 754 floating /// point bit layout. This encoder simply copies incoming floats/doubles to its internal /// byte buffer. diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index f42e76c0..cb597675 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -75,7 +75,7 @@ where fn take_inner(&mut self) -> Bytes; } -pub trait PrimitiveValueDecoder: Iterator> { +pub trait PrimitiveValueDecoder { /// Decode out.len() values into out at a time, failing if it cannot fill /// the buffer. fn decode(&mut self, out: &mut [V]) -> Result<()>; @@ -408,15 +408,6 @@ mod tests { /// Emits numbers increasing from 0. struct DummyDecoder; - // TODO: remove this eventually - impl Iterator for DummyDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - todo!() - } - } - impl PrimitiveValueDecoder for DummyDecoder { fn decode(&mut self, out: &mut [i32]) -> Result<()> { let values = (0..out.len()).map(|x| x as i32).collect::>(); diff --git a/src/encoding/rle_v1.rs b/src/encoding/rle_v1.rs index 6fd1d277..dcee1595 100644 --- a/src/encoding/rle_v1.rs +++ b/src/encoding/rle_v1.rs @@ -95,30 +95,6 @@ impl RleReaderV1 { } } -impl Iterator for RleReaderV1 { - type Item = Result; - - fn next(&mut self) -> Option { - if self.current_head >= self.decoded_ints.len() { - self.current_head = 0; - self.decoded_ints.clear(); - match self.decode_batch() { - Ok(more) => { - if !more { - return None; - } - } - Err(err) => { - return Some(Err(err)); - } - } - } - let result = self.decoded_ints[self.current_head]; - self.current_head += 1; - Some(Ok(result)) - } -} - impl PrimitiveValueDecoder for RleReaderV1 { // TODO: this is exact duplicate from RLEv2 version; deduplicate it fn decode(&mut self, out: &mut [N]) -> Result<()> { @@ -171,30 +147,31 @@ mod tests { use super::*; + fn test_helper(data: &[u8], expected: &[i64]) { + let mut reader = RleReaderV1::::new(Cursor::new(data)); + let mut actual = vec![0; expected.len()]; + reader.decode(&mut actual).unwrap(); + assert_eq!(actual, expected); + } + #[test] fn test_run() -> Result<()> { - let input = [0x61, 0x00, 0x07]; - let decoder = RleReaderV1::::new(Cursor::new(&input)); - let expected = vec![7; 100]; - let actual = decoder.collect::>>()?; - assert_eq!(actual, expected); + let data = [0x61, 0x00, 0x07]; + let expected = [7; 100]; + test_helper(&data, &expected); - let input = [0x61, 0xff, 0x64]; - let decoder = RleReaderV1::::new(Cursor::new(&input)); + let data = [0x61, 0xff, 0x64]; let expected = (1..=100).rev().collect::>(); - let actual = decoder.collect::>>()?; - assert_eq!(actual, expected); + test_helper(&data, &expected); Ok(()) } #[test] fn test_literal() -> Result<()> { - let input = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb]; - let decoder = RleReaderV1::::new(Cursor::new(&input)); + let data = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb]; let expected = vec![2, 3, 6, 7, 11]; - let actual = decoder.collect::>>()?; - assert_eq!(actual, expected); + test_helper(&data, &expected); Ok(()) } diff --git a/src/encoding/rle_v2/mod.rs b/src/encoding/rle_v2/mod.rs index 8d7068f6..38b35b7b 100644 --- a/src/encoding/rle_v2/mod.rs +++ b/src/encoding/rle_v2/mod.rs @@ -101,29 +101,6 @@ impl RleReaderV2 { } } -// TODO: remove this, currently only needed as we move from iterator to PrimitiveValueDecoder -impl Iterator for RleReaderV2 { - type Item = Result; - - fn next(&mut self) -> Option { - if self.current_head >= self.decoded_ints.len() { - match self.decode_batch() { - Ok(more) => { - if !more { - return None; - } - } - Err(err) => { - return Some(Err(err)); - } - } - } - let result = self.decoded_ints[self.current_head]; - self.current_head += 1; - Some(Ok(result)) - } -} - impl PrimitiveValueDecoder for RleReaderV2 { fn decode(&mut self, out: &mut [N]) -> Result<()> { let available = &self.decoded_ints[self.current_head..]; @@ -571,136 +548,88 @@ mod tests { use super::*; + // TODO: have tests varying the out buffer, to ensure decode() is called + // multiple times + + fn test_helper(data: &[u8], expected: &[i64]) { + let mut reader = RleReaderV2::::new(Cursor::new(data)); + let mut actual = vec![0; expected.len()]; + reader.decode(&mut actual).unwrap(); + assert_eq!(actual, expected); + } + #[test] fn reader_test() { let data = [2, 1, 64, 5, 80, 1, 1]; let expected = [1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1, 1, 1]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - assert_eq!(a, expected); + test_helper::(&data, &expected); // direct let data = [0x5e, 0x03, 0x5c, 0xa1, 0xab, 0x1e, 0xde, 0xad, 0xbe, 0xef]; let expected = [23713, 43806, 57005, 48879]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - assert_eq!(a, expected); + test_helper::(&data, &expected); // patched base let data = [ 102, 9, 0, 126, 224, 7, 208, 0, 126, 79, 66, 64, 0, 127, 128, 8, 2, 0, 128, 192, 8, 22, 0, 130, 0, 8, 42, ]; - let expected = [ 2030, 2000, 2020, 1000000, 2040, 2050, 2060, 2070, 2080, 2090, ]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - assert_eq!(a, expected); + test_helper::(&data, &expected); let data = [196, 9, 2, 2, 74, 40, 166]; let expected = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - assert_eq!(a, expected); + test_helper::(&data, &expected); let data = [0xc6, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46]; let expected = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - assert_eq!(a, expected); + test_helper::(&data, &expected); let data = [7, 1]; let expected = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - assert_eq!(a, expected); + test_helper::(&data, &expected); } #[test] fn short_repeat() { - // [10000, 10000, 10000, 10000, 10000] - let data: [u8; 3] = [0x0a, 0x27, 0x10]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - - assert_eq!(a, vec![10000, 10000, 10000, 10000, 10000]); + let data = [0x0a, 0x27, 0x10]; + let expected = [10000, 10000, 10000, 10000, 10000]; + test_helper::(&data, &expected); } #[test] fn direct() { - // [23713, 43806, 57005, 48879] - let data: [u8; 10] = [0x5e, 0x03, 0x5c, 0xa1, 0xab, 0x1e, 0xde, 0xad, 0xbe, 0xef]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - - assert_eq!(a, vec![23713, 43806, 57005, 48879]); + let data = [0x5e, 0x03, 0x5c, 0xa1, 0xab, 0x1e, 0xde, 0xad, 0xbe, 0xef]; + let expected = [23713, 43806, 57005, 48879]; + test_helper::(&data, &expected); } #[test] fn direct_signed() { - // [23713, 43806, 57005, 48879] let data = [110, 3, 0, 185, 66, 1, 86, 60, 1, 189, 90, 1, 125, 222]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - - assert_eq!(a, vec![23713, 43806, 57005, 48879]); + let expected = [23713, 43806, 57005, 48879]; + test_helper::(&data, &expected); } #[test] fn delta() { - // [2, 3, 5, 7, 11, 13, 17, 19, 23, 29] - // 0x22 = 34 - // 0x42 = 66 - // 0x46 = 70 - let data: [u8; 8] = [0xc6, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - - assert_eq!(a, vec![2, 3, 5, 7, 11, 13, 17, 19, 23, 29]); + let data = [0xc6, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46]; + let expected = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]; + test_helper::(&data, &expected); } #[test] fn patched_base() { - let data = vec![ + let data = [ 0x8e, 0x09, 0x2b, 0x21, 0x07, 0xd0, 0x1e, 0x00, 0x14, 0x70, 0x28, 0x32, 0x3c, 0x46, 0x50, 0x5a, 0xfc, 0xe8, ]; - - let expected = vec![ + let expected = [ 2030, 2000, 2020, 1000000, 2040, 2050, 2060, 2070, 2080, 2090, ]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader - .collect::>>() - .unwrap() - .into_iter() - .collect::>(); - - assert_eq!(a, expected); + test_helper::(&data, &expected); } #[test] @@ -720,7 +649,6 @@ mod tests { 91, 198, 1, 2, 0, 32, 144, 64, 0, 12, 2, 8, 24, 0, 64, 0, 1, 0, 0, 8, 48, 51, 128, 0, 2, 12, 16, 32, 32, 71, 128, 19, 76, ]; - // expected data generated from Orc Java implementation let expected = vec![ 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2, 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, @@ -734,12 +662,7 @@ mod tests { 2, 1, 5, 10, 3, 1, 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2, 2, 16, ]; - - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let a = reader.collect::>>().unwrap(); - - assert_eq!(a, expected); + test_helper::(&data, &expected); } // TODO: be smarter about prop test here, generate different patterns of ints @@ -752,11 +675,11 @@ mod tests { writer.write_slice(values); let data = writer.take_inner(); - let cursor = Cursor::new(data); - let reader = RleReaderV2::::new(cursor); - let out = reader.collect::>>()?; + let mut reader = RleReaderV2::::new(Cursor::new(data)); + let mut actual = vec![N::zero(); values.len()]; + reader.decode(&mut actual).unwrap(); - Ok(out) + Ok(actual) } proptest! { diff --git a/src/encoding/timestamp.rs b/src/encoding/timestamp.rs index 1a902faa..d0113111 100644 --- a/src/encoding/timestamp.rs +++ b/src/encoding/timestamp.rs @@ -49,22 +49,6 @@ impl TimestampDecoder { } } -// TODO: remove this -impl Iterator for TimestampDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - // TODO: throw error for mismatched stream lengths? - let (seconds_since_orc_base, nanoseconds) = - self.data.by_ref().zip(self.secondary.by_ref()).next()?; - Some(decode_timestamp::( - self.base_from_epoch, - seconds_since_orc_base, - nanoseconds, - )) - } -} - impl PrimitiveValueDecoder for TimestampDecoder { fn decode(&mut self, out: &mut [T::Native]) -> Result<()> { // TODO: can probably optimize, reuse buffers? @@ -75,11 +59,8 @@ impl PrimitiveValueDecoder for TimestampDecode for (index, (&seconds_since_orc_base, &nanoseconds)) in data.iter().zip(secondary.iter()).enumerate() { - out[index] = decode_timestamp::( - self.base_from_epoch, - Ok(seconds_since_orc_base), - Ok(nanoseconds), - )?; + out[index] = + decode_timestamp::(self.base_from_epoch, seconds_since_orc_base, nanoseconds)?; } Ok(()) } @@ -108,22 +89,6 @@ impl TimestampNanosecondAsDecimalDecoder { } } -// TODO: remove this -impl Iterator for TimestampNanosecondAsDecimalDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - // TODO: throw error for mismatched stream lengths? - let (seconds_since_orc_base, nanoseconds) = - self.data.by_ref().zip(self.secondary.by_ref()).next()?; - Some(decode_timestamp_as_i128( - self.base_from_epoch, - seconds_since_orc_base, - nanoseconds, - )) - } -} - impl PrimitiveValueDecoder for TimestampNanosecondAsDecimalDecoder { fn decode(&mut self, out: &mut [i128]) -> Result<()> { // TODO: can probably optimize, reuse buffers? @@ -134,24 +99,17 @@ impl PrimitiveValueDecoder for TimestampNanosecondAsDecimalDecoder { for (index, (&seconds_since_orc_base, &nanoseconds)) in data.iter().zip(secondary.iter()).enumerate() { - out[index] = decode_timestamp_as_i128( - self.base_from_epoch, - Ok(seconds_since_orc_base), - Ok(nanoseconds), - )?; + out[index] = + decode_timestamp_as_i128(self.base_from_epoch, seconds_since_orc_base, nanoseconds); } Ok(()) } } -fn decode( - base: i64, - seconds_since_orc_base: Result, - nanoseconds: Result, -) -> Result<(i128, i64, u64)> { - let data = seconds_since_orc_base?; +fn decode(base: i64, seconds_since_orc_base: i64, nanoseconds: i64) -> (i128, i64, u64) { + let data = seconds_since_orc_base; // TODO: is this a safe cast? - let mut nanoseconds = nanoseconds? as u64; + let mut nanoseconds = nanoseconds as u64; // Last 3 bits indicate how many trailing zeros were truncated let zeros = nanoseconds & 0x7; nanoseconds >>= 3; @@ -175,16 +133,16 @@ fn decode( // while we encode them as a single i64 of nanoseconds in Arrow. let nanoseconds_since_epoch = (seconds as i128 * NANOSECONDS_IN_SECOND as i128) + (nanoseconds as i128); - Ok((nanoseconds_since_epoch, seconds, nanoseconds)) + (nanoseconds_since_epoch, seconds, nanoseconds) } fn decode_timestamp( base: i64, - seconds_since_orc_base: Result, - nanoseconds: Result, + seconds_since_orc_base: i64, + nanoseconds: i64, ) -> Result { let (nanoseconds_since_epoch, seconds, nanoseconds) = - decode(base, seconds_since_orc_base, nanoseconds)?; + decode(base, seconds_since_orc_base, nanoseconds); let nanoseconds_in_timeunit = match T::UNIT { TimeUnit::Second => 1_000_000_000, @@ -219,11 +177,7 @@ fn decode_timestamp( Ok(num_since_epoch) } -fn decode_timestamp_as_i128( - base: i64, - seconds_since_orc_base: Result, - nanoseconds: Result, -) -> Result { - let (nanoseconds_since_epoch, _, _) = decode(base, seconds_since_orc_base, nanoseconds)?; - Ok(nanoseconds_since_epoch) +fn decode_timestamp_as_i128(base: i64, seconds_since_orc_base: i64, nanoseconds: i64) -> i128 { + let (nanoseconds_since_epoch, _, _) = decode(base, seconds_since_orc_base, nanoseconds); + nanoseconds_since_epoch }