Skip to content

Commit

Permalink
refactor: refactor byte/boolean iter
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 9, 2023
1 parent 51059f8 commit d438c1d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 167 deletions.
4 changes: 1 addition & 3 deletions src/arrow_reader/column/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ use crate::reader::decode::boolean_rle::BooleanIter;

pub fn new_boolean_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<bool>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();

let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| {
Box::new(BooleanIter::new(reader, rows))
as Box<dyn Iterator<Item = Result<bool>> + Send>
Box::new(BooleanIter::new(reader)) as Box<dyn Iterator<Item = Result<bool>> + Send>
})
.context(InvalidColumnSnafu { name: &column.name })?;

Expand Down
4 changes: 1 addition & 3 deletions src/arrow_reader/column/present.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ pub fn new_present_iter(
let iter = stripe
.stream_map
.get(column, Kind::Present)
.map(|reader| {
Box::new(BooleanIter::new(reader, rows)) as Box<dyn Iterator<Item = Result<bool>>>
})
.map(|reader| Box::new(BooleanIter::new(reader)) as Box<dyn Iterator<Item = Result<bool>>>)
.unwrap_or_else(|| Box::new(DummyPresentIter::new(rows)));

Ok(iter)
Expand Down
4 changes: 1 addition & 3 deletions src/arrow_reader/column/tinyint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ use crate::reader::decode::byte_rle::ByteRleIter;

pub fn new_i8_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<i8>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();

let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| {
Box::new(ByteRleIter::new(reader, rows).map(|value| value.map(|value| value as i8)))
as _
Box::new(ByteRleIter::new(reader).map(|value| value.map(|value| value as i8))) as _
})
.context(InvalidColumnSnafu { name: &column.name })?;

Expand Down
173 changes: 32 additions & 141 deletions src/reader/decode/boolean_rle.rs
Original file line number Diff line number Diff line change
@@ -1,160 +1,57 @@
use std::io::Read;

use snafu::ResultExt;
use crate::error::Result;

use crate::error::{self, Result};
use crate::reader::decode::util::read_u8;

#[derive(Debug, Copy, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum BooleanRun {
Run(u8, u16),
Literals([u8; 255]),
}

pub struct BooleanRleRunIter<R: Read> {
reader: R,
}

impl<R: Read> BooleanRleRunIter<R> {
pub fn new(reader: R) -> Self {
Self { reader }
}

pub fn into_inner(self) -> R {
self.reader
}
}

fn read_literals<R: Read>(reader: &mut R, header: i8) -> Result<[u8; 255]> {
let length = (-header) as usize;

let mut literals = [0u8; 255];

reader
.take(length as u64)
.read_exact(&mut literals[..length])
.context(error::IoSnafu)?;

Ok(literals)
}

impl<R: Read> Iterator for BooleanRleRunIter<R> {
type Item = Result<BooleanRun>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let header = match read_u8(&mut self.reader) {
Ok(header) => header as i8,
Err(e) => return Some(Err(e)),
};
if header < 0 {
Some(read_literals(&mut self.reader, header).map(BooleanRun::Literals))
} else {
let length = header as u16 + 3;
// this is not ok - it may require more than one byte
let value = match read_u8(&mut self.reader) {
Ok(value) => value,
Err(e) => return Some(Err(e)),
};
Some(Ok(BooleanRun::Run(value, length)))
}
}
}
use super::byte_rle::ByteRleIter;

pub struct BooleanIter<R: Read> {
iter: BooleanRleRunIter<R>,
current: Option<BooleanRun>,
position: u8,
byte_position: usize,
remaining: usize,
iter: ByteRleIter<R>,
data: u8,
bits_in_data: usize,
}

impl<R: Read> BooleanIter<R> {
pub fn new(reader: R, length: usize) -> Self {
pub fn new(reader: R) -> Self {
Self {
iter: BooleanRleRunIter::new(reader),
current: None,
position: 0,
byte_position: 0,
remaining: length,
iter: ByteRleIter::new(reader),
bits_in_data: 0,
data: 0,
}
}

pub fn into_inner(self) -> R {
self.iter.into_inner()
}

pub fn value(&mut self) -> bool {
let value = (self.data & 0x80) != 0;
self.data <<= 1;
self.bits_in_data -= 1;

value
}
}

impl<R: Read> Iterator for BooleanIter<R> {
type Item = Result<bool>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some(run) = &self.current {
match run {
BooleanRun::Run(value, repetitions) => {
let repetitions = *repetitions;
let mask = 128u8 >> self.position;
let result = value & mask == mask;
self.position += 1;
if self.remaining == 0 {
self.current = None;
return None;
} else {
self.remaining -= 1;
}
if self.position == 8 {
if repetitions == 0 {
self.current = None;
} else {
self.current = Some(BooleanRun::Run(*value, repetitions - 1));
}
self.position = 0;
}
Some(Ok(result))
}
BooleanRun::Literals(bytes) => {
let mask = 128u8 >> self.position;
let result = bytes[self.byte_position] & mask == mask;
self.position += 1;
if self.remaining == 0 {
self.current = None;
return None;
} else {
self.remaining -= 1;
}
if self.position == 8 {
if bytes.len() == 1 {
self.current = None;
self.byte_position = 0;
} else {
self.byte_position += 1;
}
self.position = 0;
}
Some(Ok(result))
}
}
} else if self.remaining > 0 {
match self.iter.next()? {
Ok(run) => {
self.current = Some(run);
self.next()
}
Err(e) => {
self.remaining = 0;
Some(Err(e))
// read more data if necessary
if self.bits_in_data == 0 {
match self.iter.next() {
Some(Ok(data)) => {
self.data = data;
self.bits_in_data = 8;
Some(Ok(self.value()))
}
Some(Err(err)) => Some(Err(err)),

Check warning on line 48 in src/reader/decode/boolean_rle.rs

View check run for this annotation

Codecov / codecov/patch

src/reader/decode/boolean_rle.rs#L48

Added line #L48 was not covered by tests
None => None,
}
} else {
None
Some(Ok(self.value()))
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}

#[cfg(test)]
Expand All @@ -167,10 +64,8 @@ mod test {

let data = &mut data.as_ref();

let iter = BooleanIter::new(data, 100)
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(iter, vec![false; 100])
let iter = BooleanIter::new(data).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(iter, vec![false; 800])
}

#[test]
Expand All @@ -179,9 +74,7 @@ mod test {

let data = &mut data.as_ref();

let iter = BooleanIter::new(data, 16)
.collect::<Result<Vec<_>>>()
.unwrap();
let iter = BooleanIter::new(data).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(
iter,
vec![
Expand All @@ -198,12 +91,10 @@ mod test {

let data = &mut data.as_ref();

let iter = BooleanIter::new(data, 8)
.collect::<Result<Vec<_>>>()
.unwrap();
let iter = BooleanIter::new(data).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(
iter,
vec![true, false, false, false, false, false, false, false,]
vec![true, false, false, false, false, false, false, false]
)
}
}
22 changes: 5 additions & 17 deletions src/reader/decode/byte_rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@ pub struct ByteRleIter<R: Read> {
num_literals: usize,
used: usize,
repeat: bool,
remaining: usize,
}

impl<R: Read> ByteRleIter<R> {
pub fn new(reader: R, length: usize) -> Self {
pub fn new(reader: R) -> Self {
Self {
reader,
literals: [0u8; MAX_LITERAL_SIZE],
num_literals: 0,
used: 0,
repeat: false,
remaining: length,
}
}

Expand Down Expand Up @@ -59,13 +57,10 @@ impl<R: Read> Iterator for ByteRleIter<R> {
type Item = Result<u8>;

fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
if self.used == self.num_literals {
match self.read_values() {
Ok(_) => {}
Err(err) => return Some(Err(err)),
Err(_err) => return None,
}
}

Expand All @@ -75,7 +70,6 @@ impl<R: Read> Iterator for ByteRleIter<R> {
self.literals[self.used]
};
self.used += 1;
self.remaining -= 1;
Some(Ok(result))
}
}
Expand All @@ -90,29 +84,23 @@ mod test {

let data = &mut data.as_ref();

let iter = ByteRleIter::new(data, 100)
.collect::<Result<Vec<_>>>()
.unwrap();
let iter = ByteRleIter::new(data).collect::<Result<Vec<_>>>().unwrap();

assert_eq!(iter, vec![0; 100]);

let data = [0x01, 0x01];

let data = &mut data.as_ref();

let iter = ByteRleIter::new(data, 4)
.collect::<Result<Vec<_>>>()
.unwrap();
let iter = ByteRleIter::new(data).collect::<Result<Vec<_>>>().unwrap();

assert_eq!(iter, vec![1; 4]);

let data = [0xfe, 0x44, 0x45];

let data = &mut data.as_ref();

let iter = ByteRleIter::new(data, 2)
.collect::<Result<Vec<_>>>()
.unwrap();
let iter = ByteRleIter::new(data).collect::<Result<Vec<_>>>().unwrap();

assert_eq!(iter, vec![0x44, 0x45]);
}
Expand Down

0 comments on commit d438c1d

Please sign in to comment.