Skip to content

Commit

Permalink
refactor: decoder accept &[u8] as input
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Dec 18, 2023
1 parent d29799a commit 65fce2c
Show file tree
Hide file tree
Showing 20 changed files with 194 additions and 221 deletions.
4 changes: 2 additions & 2 deletions y-octo-node/src/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ impl Doc {
#[napi]
pub fn apply_update(&mut self, update: JsBuffer) -> Result<JsBuffer> {
self.doc
.apply_update_from_binary(update.to_vec())
.and_then(|u| u.into_ybinary1().map(|v| v.into()))
.apply_update_from_binary_v1(update)
.and_then(|u| u.encode_v1().map(|v| v.into()))
.map_err(anyhow::Error::from)
}

Expand Down
2 changes: 1 addition & 1 deletion y-octo/benches/apply_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn apply(c: &mut Criterion) {
b.iter(|| {
use y_octo::*;
let mut doc = Doc::new();
doc.apply_update_from_binary(content.clone()).unwrap()
doc.apply_update_from_binary_v1(content.clone()).unwrap()
});
},
);
Expand Down
2 changes: 1 addition & 1 deletion y-octo/benches/update_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn update(c: &mut Criterion) {
|b, content| {
b.iter(|| {
use y_octo::*;
let mut decoder = RawDecoder::new(content.clone());
let mut decoder = RawDecoder::new(content);
Update::read(&mut decoder).unwrap()
});
},
Expand Down
4 changes: 2 additions & 2 deletions y-octo/src/doc/codec/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ mod tests {
any.write(&mut encoder).unwrap();
let encoded = encoder.into_inner();

let mut decoder = RawDecoder::new(encoded);
let mut decoder = RawDecoder::new(&encoded);
let decoded = Any::read(&mut decoder).unwrap();

assert_eq!(any, decoded);
Expand All @@ -615,7 +615,7 @@ mod tests {
any.write(&mut encoder).unwrap();
let encoded = encoder.into_inner();

let mut decoder = RawDecoder::new(encoded);
let mut decoder = RawDecoder::new(&encoded);
let decoded = Any::read(&mut decoder).unwrap();

assert_eq!(any, &decoded);
Expand Down
3 changes: 2 additions & 1 deletion y-octo/src/doc/codec/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,9 @@ mod tests {
let mut writer = RawEncoder::default();
writer.write_u8(content.get_info())?;
content.write(&mut writer)?;
let update = writer.into_inner();

let mut reader = RawDecoder::new(writer.into_inner());
let mut reader = RawDecoder::new(&update);
let tag_type = reader.read_u8()?;
assert_eq!(Content::read(&mut reader, tag_type)?, *content);

Expand Down
3 changes: 2 additions & 1 deletion y-octo/src/doc/codec/delete_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ mod tests {
let delete_set = DeleteSet::from([(1, vec![0..10, 20..30]), (2, vec![0..5, 10..20])]);
let mut encoder = RawEncoder::default();
delete_set.write(&mut encoder).unwrap();
let mut decoder = RawDecoder::new(encoder.into_inner());
let update = encoder.into_inner();
let mut decoder = RawDecoder::new(&update);
let decoded = DeleteSet::read(&mut decoder).unwrap();
assert_eq!(delete_set, decoded);
}
Expand Down
134 changes: 88 additions & 46 deletions y-octo/src/doc/codec/io/codec_v1.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
use std::io::Cursor;

use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};

use super::*;

#[inline]
pub fn read_with_cursor<T, F>(buffer: &mut Cursor<&[u8]>, f: F) -> JwstCodecResult<T>
where
F: FnOnce(&[u8]) -> IResult<&[u8], T>,
{
// TODO: use remaining_slice() instead after it is stabilized
let input = buffer.get_ref();
let rest_pos = buffer.position().min(input.len() as u64) as usize;
let input = &input[rest_pos..];

let (tail, result) = f(input).map_err(|e| e.map_input(|u| u.len()))?;

buffer.set_position((rest_pos + input.len() - tail.len()) as u64);
Ok(result)
}

// compatible with ydoc v1
#[derive(Clone)]
pub struct RawDecoder {
pub(super) buffer: Cursor<Vec<u8>>,
pub struct RawDecoder<'b> {
pub(super) buffer: Cursor<&'b [u8]>,
}

impl RawDecoder {
pub fn new(buffer: Vec<u8>) -> Self {
impl<'b> RawDecoder<'b> {
pub fn new(buffer: &'b [u8]) -> Self {
Self {
buffer: Cursor::new(buffer),
}
Expand All @@ -26,36 +44,59 @@ impl RawDecoder {
}
}

pub fn drain(self) -> Vec<u8> {
let pos = self.buffer.position();
let mut buf = self.buffer.into_inner();
pub fn drain(self) -> &'b [u8] {
let pos = self.buffer.position() as usize;
let buf = self.buffer.into_inner();

if pos == 0 {
buf
} else {
buf.split_off(pos as usize)
&buf[pos..]
}
}
}

impl CrdtReader for RawDecoder {
fn get_buffer(&self) -> &Cursor<Vec<u8>> {
&self.buffer
impl CrdtReader for RawDecoder<'_> {
fn is_empty(&self) -> bool {
self.buffer.position() >= self.buffer.get_ref().len() as u64
}

fn len(&self) -> u64 {
self.buffer.get_ref().len() as u64 - self.buffer.position()
}

fn get_buffer_mut(&mut self) -> &mut Cursor<Vec<u8>> {
&mut self.buffer
fn read_var_u64(&mut self) -> JwstCodecResult<u64> {
read_with_cursor(&mut self.buffer, read_var_u64)
}

#[inline(always)]
fn read_info(&mut self) -> JwstCodecResult<u8> {
self.read_u8()
fn read_var_i32(&mut self) -> JwstCodecResult<i32> {
read_with_cursor(&mut self.buffer, read_var_i32)
}

fn read_item_id(&mut self) -> JwstCodecResult<Id> {
let client = self.read_var_u64()?;
let clock = self.read_var_u64()?;
Ok(Id::new(client, clock))
fn read_var_string(&mut self) -> JwstCodecResult<String> {
read_with_cursor(&mut self.buffer, read_var_string)
}

fn read_var_buffer(&mut self) -> JwstCodecResult<Vec<u8>> {
read_with_cursor(&mut self.buffer, |i| {
read_var_buffer(i).map(|(tail, val)| (tail, val.to_vec()))
})
}

fn read_u8(&mut self) -> JwstCodecResult<u8> {
self.buffer.read_u8().map_err(reader::map_read_error)
}

fn read_f32_be(&mut self) -> JwstCodecResult<f32> {
self.buffer.read_f32::<BigEndian>().map_err(reader::map_read_error)
}

fn read_f64_be(&mut self) -> JwstCodecResult<f64> {
self.buffer.read_f64::<BigEndian>().map_err(reader::map_read_error)
}

fn read_i64_be(&mut self) -> JwstCodecResult<i64> {
self.buffer.read_i64::<BigEndian>().map_err(reader::map_read_error)
}
}

Expand All @@ -72,21 +113,31 @@ impl RawEncoder {
}

impl CrdtWriter for RawEncoder {
fn get_buffer_mut(&mut self) -> &mut Cursor<Vec<u8>> {
&mut self.buffer
fn write_var_u64(&mut self, num: u64) -> JwstCodecResult {
write_var_u64(&mut self.buffer, num).map_err(writer::map_write_error)
}

// ydoc specific write functions
#[inline(always)]
fn write_info(&mut self, num: u8) -> JwstCodecResult {
self.write_u8(num)
fn write_var_i32(&mut self, num: i32) -> JwstCodecResult {
write_var_i32(&mut self.buffer, num).map_err(writer::map_write_error)
}

fn write_item_id(&mut self, id: &Id) -> JwstCodecResult {
self.write_var_u64(id.client)?;
self.write_var_u64(id.clock)?;
fn write_var_string<S: AsRef<str>>(&mut self, s: S) -> JwstCodecResult {
write_var_string(&mut self.buffer, s).map_err(writer::map_write_error)
}
fn write_var_buffer(&mut self, buf: &[u8]) -> JwstCodecResult {
write_var_buffer(&mut self.buffer, buf).map_err(writer::map_write_error)
}
fn write_u8(&mut self, num: u8) -> JwstCodecResult {
self.buffer.write_u8(num).map_err(writer::map_write_error)?;
Ok(())
}
fn write_f32_be(&mut self, num: f32) -> JwstCodecResult {
self.buffer.write_f32::<BigEndian>(num).map_err(writer::map_write_error)
}
fn write_f64_be(&mut self, num: f64) -> JwstCodecResult {
self.buffer.write_f64::<BigEndian>(num).map_err(writer::map_write_error)
}
fn write_i64_be(&mut self, num: i64) -> JwstCodecResult {
self.buffer.write_i64::<BigEndian>(num).map_err(writer::map_write_error)
}
}

#[cfg(test)]
Expand All @@ -97,12 +148,11 @@ mod tests {
#[test]
fn test_crdt_reader() {
{
let mut reader = RawDecoder::new(vec![0xf2, 0x5]);
let mut reader = RawDecoder::new(&[0xf2, 0x5]);
assert_eq!(reader.read_var_u64().unwrap(), 754);
}
{
let buffer = vec![0x5, b'h', b'e', b'l', b'l', b'o'];
let mut reader = RawDecoder::new(buffer.clone());
let mut reader = RawDecoder::new(&[0x5, b'h', b'e', b'l', b'l', b'o']);

assert_eq!(reader.clone().read_var_string().unwrap(), "hello");
assert_eq!(reader.clone().read_var_buffer().unwrap().as_slice(), b"hello");
Expand All @@ -115,29 +165,21 @@ mod tests {
assert_eq!(reader.read_u8().unwrap(), b'o');
}
{
let mut reader = RawDecoder::new(vec![0x40, 0x49, 0x0f, 0xdb]);
let mut reader = RawDecoder::new(&[0x40, 0x49, 0x0f, 0xdb]);
assert_eq!(reader.read_f32_be().unwrap(), 3.1415927);
}
{
let mut reader = RawDecoder::new(vec![0x40, 0x09, 0x21, 0xfb, 0x54, 0x44, 0x2d, 0x18]);
let mut reader = RawDecoder::new(&[0x40, 0x09, 0x21, 0xfb, 0x54, 0x44, 0x2d, 0x18]);
assert_eq!(reader.read_f64_be().unwrap(), 3.141592653589793);
}
{
let mut reader = RawDecoder::new(vec![0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]);
let mut reader = RawDecoder::new(&[0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]);
assert_eq!(reader.read_i64_be().unwrap(), i64::MAX);
}
{
let mut reader = RawDecoder::new(vec![0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
let mut reader = RawDecoder::new(&[0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
assert_eq!(reader.read_i64_be().unwrap(), i64::MIN);
}
{
let mut reader = RawDecoder::new(vec![0x80]);
assert_eq!(reader.read_info().unwrap(), 0x80);
}
{
let mut reader = RawDecoder::new(vec![0x1, 0x2]);
assert_eq!(reader.read_item_id().unwrap(), Id::new(1, 2));
}
}

#[test]
Expand Down
83 changes: 23 additions & 60 deletions y-octo/src/doc/codec/io/reader.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,35 @@
use std::io::{Cursor, Error};

use byteorder::{BigEndian, ReadBytesExt};
use std::io::Error;

use super::*;

#[inline]
fn read_with_cursor<T, F>(buffer: &mut Cursor<Vec<u8>>, f: F) -> JwstCodecResult<T>
where
F: FnOnce(&[u8]) -> IResult<&[u8], T>,
{
// TODO: use remaining_slice() instead after it is stabilized
let input = buffer.get_ref();
let rest_pos = buffer.position().min(input.len() as u64) as usize;
let input = &input[rest_pos..];

let (tail, result) = f(input).map_err(|e| e.map_input(|u| u.len()))?;

buffer.set_position((rest_pos + input.len() - tail.len()) as u64);
Ok(result)
}

#[inline]
fn map_io_error(e: Error) -> JwstCodecError {
pub fn map_read_error(e: Error) -> JwstCodecError {
JwstCodecError::IncompleteDocument(e.to_string())
}

pub trait CrdtReader {
// basic read functions
fn get_buffer(&self) -> &Cursor<Vec<u8>>;
fn get_buffer_mut(&mut self) -> &mut Cursor<Vec<u8>>;
fn read_var_u64(&mut self) -> JwstCodecResult<u64> {
read_with_cursor(self.get_buffer_mut(), read_var_u64)
}
fn read_var_i32(&mut self) -> JwstCodecResult<i32> {
read_with_cursor(self.get_buffer_mut(), read_var_i32)
}
fn read_var_string(&mut self) -> JwstCodecResult<String> {
read_with_cursor(self.get_buffer_mut(), read_var_string)
fn is_empty(&self) -> bool;
fn len(&self) -> u64;
fn read_var_u64(&mut self) -> JwstCodecResult<u64>;
fn read_var_i32(&mut self) -> JwstCodecResult<i32>;
fn read_var_string(&mut self) -> JwstCodecResult<String>;
fn read_var_buffer(&mut self) -> JwstCodecResult<Vec<u8>>;
fn read_u8(&mut self) -> JwstCodecResult<u8>;
fn read_f32_be(&mut self) -> JwstCodecResult<f32>;
fn read_f64_be(&mut self) -> JwstCodecResult<f64>;
fn read_i64_be(&mut self) -> JwstCodecResult<i64>;

#[inline(always)]
fn read_info(&mut self) -> JwstCodecResult<u8> {
self.read_u8()
}

#[inline(always)]
fn read_item_id(&mut self) -> JwstCodecResult<Id> {
let client = self.read_var_u64()?;
let clock = self.read_var_u64()?;
Ok(Id::new(client, clock))
}
fn read_var_buffer(&mut self) -> JwstCodecResult<Vec<u8>> {
read_with_cursor(self.get_buffer_mut(), |i| {
read_var_buffer(i).map(|(tail, val)| (tail, val.to_vec()))
})
}
fn read_u8(&mut self) -> JwstCodecResult<u8> {
self.get_buffer_mut().read_u8().map_err(map_io_error)
}
fn read_f32_be(&mut self) -> JwstCodecResult<f32> {
self.get_buffer_mut().read_f32::<BigEndian>().map_err(map_io_error)
}
fn read_f64_be(&mut self) -> JwstCodecResult<f64> {
self.get_buffer_mut().read_f64::<BigEndian>().map_err(map_io_error)
}
fn read_i64_be(&mut self) -> JwstCodecResult<i64> {
self.get_buffer_mut().read_i64::<BigEndian>().map_err(map_io_error)
}
fn is_empty(&self) -> bool {
let buffer = self.get_buffer();
buffer.position() >= buffer.get_ref().len() as u64
}
fn len(&self) -> u64 {
let buffer = self.get_buffer();
buffer.get_ref().len() as u64 - buffer.position()
}

// ydoc specific read functions
fn read_info(&mut self) -> JwstCodecResult<u8>;
fn read_item_id(&mut self) -> JwstCodecResult<Id>;
}

pub trait CrdtRead<R: CrdtReader> {
Expand Down
Loading

0 comments on commit 65fce2c

Please sign in to comment.