Skip to content

Commit

Permalink
fix: fix the bypass read does not sync the cur of BufferReader (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Dec 30, 2023
1 parent 3774ab0 commit 0e76e54
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions core/src/raw/oio/read/buffer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,13 @@ where
if self.pos == self.filled && dst.len() >= self.capacity() {
let res = ready!(self.r.poll_read(cx, dst));
self.discard_buffer();
return Poll::Ready(res);
return match res {
Ok(nread) => {
self.cur += nread as u64;
Poll::Ready(Ok(nread))
}
Err(err) => Poll::Ready(Err(err)),
};
}

let rem = ready!(self.poll_fill_buf(cx))?;
Expand Down Expand Up @@ -252,7 +258,13 @@ where
if self.pos == self.filled && dst.len() >= self.capacity() {
let res = self.r.read(dst);
self.discard_buffer();
return res;
return match res {
Ok(nread) => {
self.cur += nread as u64;
Ok(nread)
}
Err(err) => Err(err),
};
}

let rem = self.fill_buf()?;
Expand Down Expand Up @@ -552,6 +564,33 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_bypass_read_and_seek_relative() -> anyhow::Result<()> {
let bs = Bytes::copy_from_slice(
&b"Hello, World! I'm going to tests a seek relative related bug!"[..],
);
let acc = Arc::new(MockReadService::new(bs.clone()));
let r = Box::new(RangeReader::new(
acc,
"x",
OpRead::default().with_range(BytesRange::from(..)),
)) as oio::Reader;
let mut r = Box::new(BufferReader::new(r, 5)) as oio::Reader;

let mut cur = 0;
for _ in 0..3 {
let mut dst = [0u8; 6];
let nread = r.read(&mut dst).await?;
assert_eq!(nread, 6);
cur += 6;
}

let ret_cur = r.seek(SeekFrom::Current(6)).await?;
assert_eq!(cur + 6, ret_cur);

Ok(())
}

#[tokio::test]
async fn test_read_part() -> anyhow::Result<()> {
let (bs, _) = gen_bytes();
Expand Down Expand Up @@ -726,6 +765,28 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_blocking_bypass_read_and_seek_relative() -> anyhow::Result<()> {
let bs = Bytes::copy_from_slice(
&b"Hello, World! I'm going to tests a seek relative related bug!"[..],
);
let r = Box::new(oio::Cursor::from(bs.clone())) as oio::BlockingReader;
let mut r = Box::new(BufferReader::new(r, 5)) as oio::BlockingReader;

let mut cur = 0;
for _ in 0..3 {
let mut dst = [0u8; 6];
let nread = r.read(&mut dst)?;
assert_eq!(nread, 6);
cur += 6;
}

let ret_cur = r.seek(SeekFrom::Current(6))?;
assert_eq!(cur + 6, ret_cur);

Ok(())
}

#[tokio::test]
async fn test_blocking_read_part() -> anyhow::Result<()> {
use std::io::Read;
Expand Down

0 comments on commit 0e76e54

Please sign in to comment.