Skip to content

Commit

Permalink
Impl RleV1Encoder for integer (#37)
Browse files Browse the repository at this point in the history
* impl RleV1Encoder

* add test

* use zero either default

* refactor RleV1Encoder in stateful code

* clean clippy

* fix and add more test

* add test

* avoid new vec

* add comments

* fix

* fix

* fix

* use Rc<RefCell> to alloc buffer on heap and reuse it

* simply code

* fix
  • Loading branch information
suxiaogang223 authored Jan 19, 2025
1 parent 03429d6 commit 947f4a4
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 18 deletions.
1 change: 0 additions & 1 deletion src/encoding/byte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl ByteRleEncoder {
self.tail_run_length = 1;
} else if let Some(run_value) = self.run_value {
// Run mode

if value == run_value {
// Continue buffering for Run sequence, flushing if reaching max length
self.num_literals += 1;
Expand Down
245 changes: 228 additions & 17 deletions src/encoding/integer/rle_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,30 @@

//! Handling decoding of Integer Run Length Encoded V1 data in ORC files
use std::{io::Read, marker::PhantomData};
use std::{io::Read, marker::PhantomData, ops::RangeInclusive};

use bytes::{BufMut, BytesMut};
use snafu::OptionExt;

use crate::{
encoding::{
rle::GenericRle,
util::{read_u8, try_read_u8},
PrimitiveValueEncoder,
},
error::{OutOfSpecSnafu, Result},
memory::EstimateMemory,
};

use super::{util::read_varint_zigzagged, EncodingSign, NInt};
use super::{
util::{read_varint_zigzagged, write_varint_zigzagged},
EncodingSign, NInt,
};

const MAX_RUN_LENGTH: usize = 130;
const MIN_RUN_LENGTH: usize = 3;
const MAX_RUN_LENGTH: usize = 127 + MIN_RUN_LENGTH;
const MAX_LITERAL_LENGTH: usize = 128;
const DELAT_RANGE: RangeInclusive<i64> = -128..=127;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum EncodingType {
Expand Down Expand Up @@ -147,6 +156,190 @@ impl<N: NInt, R: Read, S: EncodingSign> GenericRle<N> for RleV1Decoder<N, R, S>
}
}

/// Represents the state of the RLE V1 encoder.
///
/// The encoder can be in one of three states:
///
/// 1. `Empty`: The buffer is empty and there are no values to encode.
/// 2. `Literal`: The encoder is in literal mode, with values saved in buffer.
/// 3. `Run`: The encoder is in run mode, with a run value, delta, and length.
#[derive(Debug, Clone, Eq, PartialEq)]
enum RleV1EncodingState<N: NInt> {
Empty,
Literal,
Run { value: N, delta: i8, length: usize },
}

impl<N: NInt> Default for RleV1EncodingState<N> {
fn default() -> Self {
Self::Empty
}
}

/// `RleV1Encoder` is responsible for encoding a stream of integers using the Run Length Encoding (RLE) version 1 format.
pub struct RleV1Encoder<N: NInt, S: EncodingSign> {
writer: BytesMut,
state: RleV1EncodingState<N>,
buffer: Vec<N>,
sign: PhantomData<S>,
}

impl<N: NInt, S: EncodingSign> RleV1Encoder<N, S> {
/// Processes a given value and updates the encoder state accordingly.
///
/// The function handles three possible states of the encoder:
///
/// 1. `RleV1EncoderState::Empty`:
/// - Transitions to the `Literal` state with the given value as the first element in the buffer.
///
/// 2. `RleV1EncoderState::Run`:
/// - If the value continues the current run (i.e., it matches the expected value based on the run's delta and length),
/// the run length is incremented. If the run length reaches `MAX_RUN_LENGTH`, the run is written out and the state
/// transitions to `Empty`.
/// - If the value does not continue the current run, the existing run is written out and the state transitions to
/// `Literal` with the new value as the first element in the buffer.
///
/// 3. `RleV1EncoderState::Literal`:
/// - The value is added to the buffer. If the buffer length reaches `MAX_LITERAL_LENGTH`, the buffer is written out
/// and the state transitions to `Empty`.
/// - If the buffer length is at least `MIN_RUN_LENGTH` and the values in the buffer form a valid run (i.e., the deltas
/// between consecutive values are consistent and within the allowed range), the state transitions to `Run`.
/// - Otherwise, the state remains `Literal`.
///
fn process_value(&mut self, value: N) {
match &mut self.state {
RleV1EncodingState::Empty => {
// change to literal model
self.buffer.clear();
self.buffer.push(value);
self.state = RleV1EncodingState::Literal;
}
RleV1EncodingState::Literal => {
let buf = &mut self.buffer;
buf.push(value);
let length = buf.len();
let delta = (value - buf[length - 2]).as_i64();
// check if can change to run model
if length >= MIN_RUN_LENGTH
&& DELAT_RANGE.contains(&delta)
&& delta == (buf[length - 2] - buf[length - 3]).as_i64()
{
// change to run model
if length > MIN_RUN_LENGTH {
// write the left literals
write_literals::<_, S>(&mut self.writer, &buf[..(length - MIN_RUN_LENGTH)]);
}
self.state = RleV1EncodingState::Run {
value: buf[length - MIN_RUN_LENGTH],
delta: delta as i8,
length: MIN_RUN_LENGTH,
}
} else if length == MAX_LITERAL_LENGTH {
// reach buffer limit, write literals and change to empty state
write_literals::<_, S>(&mut self.writer, buf);
self.state = RleV1EncodingState::Empty;
}
// else keep literal mode
}
RleV1EncodingState::Run {
value: run_value,
delta,
length,
} => {
if run_value.as_i64() + (*delta as i64) * (*length as i64) == value.as_i64() {
// keep run model
*length += 1;
if *length == MAX_RUN_LENGTH {
// reach run limit
write_run::<_, S>(&mut self.writer, *run_value, *delta, *length);
self.state = RleV1EncodingState::Empty;
}
} else {
// write run values and change to literal model
write_run::<_, S>(&mut self.writer, *run_value, *delta, *length);
self.buffer.clear();
self.buffer.push(value);
self.state = RleV1EncodingState::Literal;
}
}
}
}

/// Flushes the current state of the encoder, writing out any buffered values.
///
/// This function handles the three possible states of the encoder:
///
/// 1. `RleV1EncoderState::Empty`:
/// - No action is needed as there are no buffered values to write.
///
/// 3. `RleV1EncoderState::Literal`:
/// - Writes out the buffered literal values.
///
/// 2. `RleV1EncoderState::Run`:
/// - Writes out the current run of values.
///
/// After calling this function, the encoder state will be reset to `Empty`.
fn flush(&mut self) {
let state = std::mem::take(&mut self.state);
match state {
RleV1EncodingState::Empty => {}
RleV1EncodingState::Literal => {
write_literals::<_, S>(&mut self.writer, &self.buffer);
}
RleV1EncodingState::Run {
value,
delta,
length,
} => {
write_run::<_, S>(&mut self.writer, value, delta, length);
}
}
}
}

fn write_run<N: NInt, S: EncodingSign>(writer: &mut BytesMut, value: N, delta: i8, length: usize) {
// write header
writer.put_u8(length as u8 - 3);
writer.put_u8(delta as u8);
// write run value
write_varint_zigzagged::<_, S>(writer, value);
}

fn write_literals<N: NInt, S: EncodingSign>(writer: &mut BytesMut, buffer: &[N]) {
// write header
writer.put_u8(-(buffer.len() as i8) as u8);
// write literals
for literal in buffer {
write_varint_zigzagged::<_, S>(writer, *literal);
}
}

impl<N: NInt, S: EncodingSign> EstimateMemory for RleV1Encoder<N, S> {
fn estimate_memory_size(&self) -> usize {
self.writer.len()
}
}

impl<N: NInt, S: EncodingSign> PrimitiveValueEncoder<N> for RleV1Encoder<N, S> {
fn new() -> Self {
Self {
writer: BytesMut::new(),
state: Default::default(),
buffer: Vec::with_capacity(MAX_LITERAL_LENGTH),
sign: Default::default(),
}
}

fn write_one(&mut self, value: N) {
self.process_value(value);
}

fn take_inner(&mut self) -> bytes::Bytes {
self.flush();
std::mem::take(&mut self.writer).into()
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand All @@ -155,32 +348,50 @@ mod tests {

use super::*;

fn test_helper(data: &[u8], expected: &[i64]) {
let mut reader = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(data));
let mut actual = vec![0; expected.len()];
reader.decode(&mut actual).unwrap();
assert_eq!(actual, expected);
fn test_helper(original: &[i64], encoded: &[u8]) {
let mut encoder = RleV1Encoder::<i64, UnsignedEncoding>::new();
encoder.write_slice(original);
encoder.flush();
let actual_encoded = encoder.take_inner();
assert_eq!(actual_encoded, encoded);

let mut decoder = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(encoded));
let mut actual_decoded = vec![0; original.len()];
decoder.decode(&mut actual_decoded).unwrap();
assert_eq!(actual_decoded, original);
}

#[test]
fn test_run() -> Result<()> {
let data = [0x61, 0x00, 0x07];
let expected = [7; 100];
test_helper(&data, &expected);
let original = [7; 100];
let encoded = [0x61, 0x00, 0x07];
test_helper(&original, &encoded);

let original = (1..=100).rev().collect::<Vec<_>>();
let encoded = [0x61, 0xff, 0x64];
test_helper(&original, &encoded);

let data = [0x61, 0xff, 0x64];
let expected = (1..=100).rev().collect::<Vec<_>>();
test_helper(&data, &expected);
let original = (1..=150).rev().collect::<Vec<_>>();
let encoded = [0x7f, 0xff, 0x96, 0x01, 0x11, 0xff, 0x14];
test_helper(&original, &encoded);

let original = [2, 4, 6, 8, 1, 3, 5, 7, 255];
let encoded = [0x01, 0x02, 0x02, 0x01, 0x02, 0x01, 0xff, 0xff, 0x01];
test_helper(&original, &encoded);
Ok(())
}

#[test]
fn test_literal() -> Result<()> {
let data = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
let expected = vec![2, 3, 6, 7, 11];
test_helper(&data, &expected);
let original = vec![2, 3, 6, 7, 11];
let encoded = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
test_helper(&original, &encoded);

let original = vec![2, 3, 6, 7, 11, 1, 2, 3, 0, 256];
let encoded = [
0xfb, 0x02, 0x03, 0x06, 0x07, 0x0b, 0x00, 0x01, 0x01, 0xfe, 0x00, 0x80, 0x02,
];
test_helper(&original, &encoded);
Ok(())
}
}

0 comments on commit 947f4a4

Please sign in to comment.