Skip to content

Commit

Permalink
Refactor varint handling (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey authored Nov 5, 2023
1 parent f158593 commit 9f6c216
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 63 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,4 @@ tokio = { version = "1.28", features = [
"macros",
"rt",
] }
zigzag = "0.1"
zstd = "0.12"
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ pub enum Error {
source: ArrowError,
},

#[snafu(display("Unsigned VInt"))]
EofUnsignedVInt { location: Location },
#[snafu(display("Varint being decoded is too large"))]
VarintTooLarge { location: Location },

#[snafu(display("unexpected: {}", msg))]
Unexpected { location: Location, msg: String },
Expand Down
6 changes: 3 additions & 3 deletions src/reader/decode/rle_v2/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use snafu::ensure;
use crate::error::{self, Result};
use crate::reader::decode::rle_v2::RleReaderV2;
use crate::reader::decode::util::{
read_ints, read_u8, read_vslong, read_vulong, rle_v2_delta_bit_width,
read_ints, read_u8, read_vslong, read_vulong, rle_v2_direct_bit_width,
};

impl<R: Read> RleReaderV2<R> {
pub fn read_delta_values(&mut self, header: u8) -> Result<()> {
let mut fb = (header >> 1) & 0x1f;
if fb != 0 {
fb = rle_v2_delta_bit_width(fb);
fb = rle_v2_direct_bit_width(fb);
}
let reader = &mut self.reader;
let signed = self.signed;
Expand All @@ -26,7 +26,7 @@ impl<R: Read> RleReaderV2<R> {
let first_val = if signed {
read_vslong(reader)?
} else {
read_vulong(reader)?
read_vulong(reader)? as i64
};

self.literals[self.num_literals] = first_val;
Expand Down
4 changes: 2 additions & 2 deletions src/reader/decode/rle_v2/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::io::Read;

use crate::error::Result;
use crate::reader::decode::rle_v2::RleReaderV2;
use crate::reader::decode::util::{read_ints, read_u8, rle_v2_delta_bit_width, zigzag_decode};
use crate::reader::decode::util::{read_ints, read_u8, rle_v2_direct_bit_width, zigzag_decode};

impl<R: Read> RleReaderV2<R> {
pub fn read_direct_values(&mut self, header: u8) -> Result<()> {
let fbo = (header >> 1) & 0x1F;
let fb = rle_v2_delta_bit_width(fbo);
let fb = rle_v2_direct_bit_width(fbo);

// 9 bits for length (L) (1 to 512 values)
let second_byte = read_u8(&mut self.reader)?;
Expand Down
147 changes: 102 additions & 45 deletions src/reader/decode/util.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::io::Read;

use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};

use crate::error::{self, Result};
use crate::error::{self, Result, VarintTooLargeSnafu};

pub fn read_u8(reader: &mut dyn Read) -> Result<u8> {
let mut byte = [0u8];
/// Read single byte
#[inline]
pub fn read_u8(reader: &mut impl Read) -> Result<u8> {
let mut byte = [0];
reader.read_exact(&mut byte).context(error::IoSnafu)?;

Ok(byte[0])
}

Expand Down Expand Up @@ -43,7 +44,7 @@ pub fn read_ints(
offset: usize,
len: usize,
bit_size: usize,
r: &mut dyn Read,
r: &mut impl Read,
) -> Result<()> {
let mut bits_left = 0;
let mut current = 0;
Expand Down Expand Up @@ -93,7 +94,7 @@ fn unrolled_unpack_1(
buffer: &mut [i64],
offset: usize,
len: usize,
reader: &mut dyn Read,
reader: &mut impl Read,
) -> Result<()> {
let num_hops = 8;
let remainder = len % num_hops;
Expand Down Expand Up @@ -129,7 +130,7 @@ fn unrolled_unpack_2(
buffer: &mut [i64],
offset: usize,
len: usize,
reader: &mut dyn Read,
reader: &mut impl Read,
) -> Result<()> {
let num_hops = 4;
let remainder = len % num_hops;
Expand Down Expand Up @@ -159,7 +160,7 @@ fn unrolled_unpack_4(
buffer: &mut [i64],
offset: usize,
len: usize,
reader: &mut dyn Read,
reader: &mut impl Read,
) -> Result<()> {
let num_hops = 2;
let remainder = len % num_hops;
Expand Down Expand Up @@ -483,12 +484,6 @@ fn read_long_be8(read_buffer: &[u8], rb_offset: usize) -> i64 {
+ i64::from(read_buffer[rb_offset + 7])
}

// zigzag_decode decodes an unsigned zig-zag encoded integer into a signed
// integer.
pub fn zigzag_decode(val: u64) -> i64 {
zigzag::ZigZag::decode(val)
}

pub fn rle_v2_direct_bit_width(value: u8) -> u8 {
match value {
0..=23 => value + 1,
Expand All @@ -506,49 +501,111 @@ pub fn header_to_rle_v2_direct_bit_width(header: u8) -> u8 {
rle_v2_direct_bit_width(bit_width)
}

pub fn rle_v2_delta_bit_width(value: u8) -> u8 {
match value {
0..=23 => value + 1,
27 => 32,
28 => 40,
29 => 48,
30 => 56,
31 => 64,
other => todo!("{other}"),
}
}

pub fn read_vulong<R: Read>(r: &mut R) -> Result<i64> {
let mut result: i64 = 0;
/// Decode Base 128 Unsigned Varint
pub fn read_vulong(r: &mut impl Read) -> Result<u64> {
// Varints are encoded as sequence of bytes.
// Where the high bit of a byte is set to 1 if the varint
// continues into the next byte. Eventually it should terminate
// with a byte with high bit of 0.
let mut num: u64 = 0;
let mut offset = 0;
let mut b: i64 = 0x80;
while (b & 0x80) != 0 {
let mut nb = [0u8; 1];
r.read_exact(&mut nb).context(error::IoSnafu)?;
b = i64::from(nb[0]);
if b == -1 {
return error::EofUnsignedVIntSnafu {}.fail();
}
result |= (b & 0x7F) << offset;
loop {
let b = read_u8(r)?;
let is_end = b & 0x80 == 0;
// Clear continuation bit
let b = b & 0x7F;
// TODO: have check for if larger than u64?
num |= u64::from(b)
// Ensure we don't overflow
.checked_shl(offset)
.context(VarintTooLargeSnafu)?;
// Since high bit doesn't contribute to final number
// We need to shift in multiples of 7 to account for this
offset += 7;
// If we've finally reached last byte
if is_end {
break;
}
}
Ok(result)
Ok(num)
}

pub fn read_vslong<R: Read>(r: &mut R) -> Result<i64> {
let result = read_vulong(r)?;
Ok((result as u64 >> 1) as i64 ^ -(result & 1))
read_vulong(r).map(zigzag_decode)
}

pub fn zigzag_decode(unsigned: u64) -> i64 {
// Zigzag encoding stores the sign bit in the least significant bit
let without_sign_bit = (unsigned >> 1) as i64;
let sign_bit = unsigned & 1;
// If positive, sign_bit is 0
// Negating 0 and doing bitwise XOR will just return without_sign_bit
// Since A ^ 0 = A
// If negative, sign_bit is 1
// Converting to i64, and negating ...
without_sign_bit ^ -(sign_bit as i64)
}

#[cfg(test)]
mod tests {
use crate::reader::decode::util::zigzag_decode;
use std::io::Cursor;

use crate::error::Result;
use crate::reader::decode::util::{read_vulong, zigzag_decode};

#[test]
fn test_zigzag_decode() {
let data = [0i64, -1, 1, -2, 2, -3, 3, -4, 4, -5];
for (i, v) in data.into_iter().enumerate() {
assert_eq!(zigzag_decode(i as u64), v)
assert_eq!(0, zigzag_decode(0));
assert_eq!(-1, zigzag_decode(1));
assert_eq!(1, zigzag_decode(2));
assert_eq!(-2, zigzag_decode(3));
assert_eq!(2, zigzag_decode(4));
assert_eq!(-3, zigzag_decode(5));
assert_eq!(3, zigzag_decode(6));
assert_eq!(-4, zigzag_decode(7));
assert_eq!(4, zigzag_decode(8));
assert_eq!(-5, zigzag_decode(9));
}

#[test]
fn test_read_vulong() -> Result<()> {
fn test_assert(serialized: &[u8], expected: u64) -> Result<()> {
let mut reader = Cursor::new(serialized);
assert_eq!(expected, read_vulong(&mut reader)?);
Ok(())
}

test_assert(&[0x00], 0)?;
test_assert(&[0x01], 1)?;
test_assert(&[0x7f], 127)?;
test_assert(&[0x80, 0x01], 128)?;
test_assert(&[0x81, 0x01], 129)?;
test_assert(&[0xff, 0x7f], 16_383)?;
test_assert(&[0x80, 0x80, 0x01], 16_384)?;
test_assert(&[0x81, 0x80, 0x01], 16_385)?;
test_assert(
&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01],
u64::MAX,
)?;

// when too large
let err = read_vulong(&mut Cursor::new(&[
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01,
]));
assert!(err.is_err());
assert_eq!(
"Varint being decoded is too large",
err.unwrap_err().to_string()
);

// when unexpected end to stream
let err = read_vulong(&mut Cursor::new(&[0x80, 0x80]));
assert!(err.is_err());
assert_eq!(
"Failed to read, source: failed to fill whole buffer",
err.unwrap_err().to_string()
);

Ok(())
}
}

0 comments on commit 9f6c216

Please sign in to comment.