From d438c1dd855f2f553507cd3cafcb7fe559298186 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 9 Nov 2023 12:24:32 +0000 Subject: [PATCH] refactor: refactor byte/boolean iter --- src/arrow_reader/column/boolean.rs | 4 +- src/arrow_reader/column/present.rs | 4 +- src/arrow_reader/column/tinyint.rs | 4 +- src/reader/decode/boolean_rle.rs | 173 ++++++----------------------- src/reader/decode/byte_rle.rs | 22 +--- 5 files changed, 40 insertions(+), 167 deletions(-) diff --git a/src/arrow_reader/column/boolean.rs b/src/arrow_reader/column/boolean.rs index 18df13f0..83c72608 100644 --- a/src/arrow_reader/column/boolean.rs +++ b/src/arrow_reader/column/boolean.rs @@ -9,14 +9,12 @@ use crate::reader::decode::boolean_rle::BooleanIter; pub fn new_boolean_iter(column: &Column, stripe: &Stripe) -> Result> { let present = new_present_iter(column, stripe)?.collect::>>()?; - let rows: usize = present.iter().filter(|&p| *p).count(); let iter = stripe .stream_map .get(column, Kind::Data) .map(|reader| { - Box::new(BooleanIter::new(reader, rows)) - as Box> + Send> + Box::new(BooleanIter::new(reader)) as Box> + Send> }) .context(InvalidColumnSnafu { name: &column.name })?; diff --git a/src/arrow_reader/column/present.rs b/src/arrow_reader/column/present.rs index 5864714c..0a0c2fea 100644 --- a/src/arrow_reader/column/present.rs +++ b/src/arrow_reader/column/present.rs @@ -12,9 +12,7 @@ pub fn new_present_iter( let iter = stripe .stream_map .get(column, Kind::Present) - .map(|reader| { - Box::new(BooleanIter::new(reader, rows)) as Box>> - }) + .map(|reader| Box::new(BooleanIter::new(reader)) as Box>>) .unwrap_or_else(|| Box::new(DummyPresentIter::new(rows))); Ok(iter) diff --git a/src/arrow_reader/column/tinyint.rs b/src/arrow_reader/column/tinyint.rs index c948ca36..29332743 100644 --- a/src/arrow_reader/column/tinyint.rs +++ b/src/arrow_reader/column/tinyint.rs @@ -9,14 +9,12 @@ use crate::reader::decode::byte_rle::ByteRleIter; pub fn new_i8_iter(column: &Column, stripe: &Stripe) -> Result> { let present = new_present_iter(column, stripe)?.collect::>>()?; - let rows: usize = present.iter().filter(|&p| *p).count(); let iter = stripe .stream_map .get(column, Kind::Data) .map(|reader| { - Box::new(ByteRleIter::new(reader, rows).map(|value| value.map(|value| value as i8))) - as _ + Box::new(ByteRleIter::new(reader).map(|value| value.map(|value| value as i8))) as _ }) .context(InvalidColumnSnafu { name: &column.name })?; diff --git a/src/reader/decode/boolean_rle.rs b/src/reader/decode/boolean_rle.rs index fc20629a..2cf3670c 100644 --- a/src/reader/decode/boolean_rle.rs +++ b/src/reader/decode/boolean_rle.rs @@ -1,89 +1,35 @@ use std::io::Read; -use snafu::ResultExt; +use crate::error::Result; -use crate::error::{self, Result}; -use crate::reader::decode::util::read_u8; - -#[derive(Debug, Copy, Clone, PartialEq)] -#[allow(clippy::large_enum_variant)] -pub enum BooleanRun { - Run(u8, u16), - Literals([u8; 255]), -} - -pub struct BooleanRleRunIter { - reader: R, -} - -impl BooleanRleRunIter { - pub fn new(reader: R) -> Self { - Self { reader } - } - - pub fn into_inner(self) -> R { - self.reader - } -} - -fn read_literals(reader: &mut R, header: i8) -> Result<[u8; 255]> { - let length = (-header) as usize; - - let mut literals = [0u8; 255]; - - reader - .take(length as u64) - .read_exact(&mut literals[..length]) - .context(error::IoSnafu)?; - - Ok(literals) -} - -impl Iterator for BooleanRleRunIter { - type Item = Result; - - #[inline] - fn next(&mut self) -> Option { - let header = match read_u8(&mut self.reader) { - Ok(header) => header as i8, - Err(e) => return Some(Err(e)), - }; - if header < 0 { - Some(read_literals(&mut self.reader, header).map(BooleanRun::Literals)) - } else { - let length = header as u16 + 3; - // this is not ok - it may require more than one byte - let value = match read_u8(&mut self.reader) { - Ok(value) => value, - Err(e) => return Some(Err(e)), - }; - Some(Ok(BooleanRun::Run(value, length))) - } - } -} +use super::byte_rle::ByteRleIter; pub struct BooleanIter { - iter: BooleanRleRunIter, - current: Option, - position: u8, - byte_position: usize, - remaining: usize, + iter: ByteRleIter, + data: u8, + bits_in_data: usize, } impl BooleanIter { - pub fn new(reader: R, length: usize) -> Self { + pub fn new(reader: R) -> Self { Self { - iter: BooleanRleRunIter::new(reader), - current: None, - position: 0, - byte_position: 0, - remaining: length, + iter: ByteRleIter::new(reader), + bits_in_data: 0, + data: 0, } } pub fn into_inner(self) -> R { self.iter.into_inner() } + + pub fn value(&mut self) -> bool { + let value = (self.data & 0x80) != 0; + self.data <<= 1; + self.bits_in_data -= 1; + + value + } } impl Iterator for BooleanIter { @@ -91,70 +37,21 @@ impl Iterator for BooleanIter { #[inline] fn next(&mut self) -> Option { - if let Some(run) = &self.current { - match run { - BooleanRun::Run(value, repetitions) => { - let repetitions = *repetitions; - let mask = 128u8 >> self.position; - let result = value & mask == mask; - self.position += 1; - if self.remaining == 0 { - self.current = None; - return None; - } else { - self.remaining -= 1; - } - if self.position == 8 { - if repetitions == 0 { - self.current = None; - } else { - self.current = Some(BooleanRun::Run(*value, repetitions - 1)); - } - self.position = 0; - } - Some(Ok(result)) - } - BooleanRun::Literals(bytes) => { - let mask = 128u8 >> self.position; - let result = bytes[self.byte_position] & mask == mask; - self.position += 1; - if self.remaining == 0 { - self.current = None; - return None; - } else { - self.remaining -= 1; - } - if self.position == 8 { - if bytes.len() == 1 { - self.current = None; - self.byte_position = 0; - } else { - self.byte_position += 1; - } - self.position = 0; - } - Some(Ok(result)) - } - } - } else if self.remaining > 0 { - match self.iter.next()? { - Ok(run) => { - self.current = Some(run); - self.next() - } - Err(e) => { - self.remaining = 0; - Some(Err(e)) + // read more data if necessary + if self.bits_in_data == 0 { + match self.iter.next() { + Some(Ok(data)) => { + self.data = data; + self.bits_in_data = 8; + Some(Ok(self.value())) } + Some(Err(err)) => Some(Err(err)), + None => None, } } else { - None + Some(Ok(self.value())) } } - - fn size_hint(&self) -> (usize, Option) { - (self.remaining, Some(self.remaining)) - } } #[cfg(test)] @@ -167,10 +64,8 @@ mod test { let data = &mut data.as_ref(); - let iter = BooleanIter::new(data, 100) - .collect::>>() - .unwrap(); - assert_eq!(iter, vec![false; 100]) + let iter = BooleanIter::new(data).collect::>>().unwrap(); + assert_eq!(iter, vec![false; 800]) } #[test] @@ -179,9 +74,7 @@ mod test { let data = &mut data.as_ref(); - let iter = BooleanIter::new(data, 16) - .collect::>>() - .unwrap(); + let iter = BooleanIter::new(data).collect::>>().unwrap(); assert_eq!( iter, vec![ @@ -198,12 +91,10 @@ mod test { let data = &mut data.as_ref(); - let iter = BooleanIter::new(data, 8) - .collect::>>() - .unwrap(); + let iter = BooleanIter::new(data).collect::>>().unwrap(); assert_eq!( iter, - vec![true, false, false, false, false, false, false, false,] + vec![true, false, false, false, false, false, false, false] ) } } diff --git a/src/reader/decode/byte_rle.rs b/src/reader/decode/byte_rle.rs index 8eb6969e..83081e75 100644 --- a/src/reader/decode/byte_rle.rs +++ b/src/reader/decode/byte_rle.rs @@ -12,18 +12,16 @@ pub struct ByteRleIter { num_literals: usize, used: usize, repeat: bool, - remaining: usize, } impl ByteRleIter { - pub fn new(reader: R, length: usize) -> Self { + pub fn new(reader: R) -> Self { Self { reader, literals: [0u8; MAX_LITERAL_SIZE], num_literals: 0, used: 0, repeat: false, - remaining: length, } } @@ -59,13 +57,10 @@ impl Iterator for ByteRleIter { type Item = Result; fn next(&mut self) -> Option { - if self.remaining == 0 { - return None; - } if self.used == self.num_literals { match self.read_values() { Ok(_) => {} - Err(err) => return Some(Err(err)), + Err(_err) => return None, } } @@ -75,7 +70,6 @@ impl Iterator for ByteRleIter { self.literals[self.used] }; self.used += 1; - self.remaining -= 1; Some(Ok(result)) } } @@ -90,9 +84,7 @@ mod test { let data = &mut data.as_ref(); - let iter = ByteRleIter::new(data, 100) - .collect::>>() - .unwrap(); + let iter = ByteRleIter::new(data).collect::>>().unwrap(); assert_eq!(iter, vec![0; 100]); @@ -100,9 +92,7 @@ mod test { let data = &mut data.as_ref(); - let iter = ByteRleIter::new(data, 4) - .collect::>>() - .unwrap(); + let iter = ByteRleIter::new(data).collect::>>().unwrap(); assert_eq!(iter, vec![1; 4]); @@ -110,9 +100,7 @@ mod test { let data = &mut data.as_ref(); - let iter = ByteRleIter::new(data, 2) - .collect::>>() - .unwrap(); + let iter = ByteRleIter::new(data).collect::>>().unwrap(); assert_eq!(iter, vec![0x44, 0x45]); }