diff --git a/core/src/raw/oio/read/buffer_reader.rs b/core/src/raw/oio/read/buffer_reader.rs index 95676d2b692..449f93e0102 100644 --- a/core/src/raw/oio/read/buffer_reader.rs +++ b/core/src/raw/oio/read/buffer_reader.rs @@ -151,7 +151,13 @@ where if self.pos == self.filled && dst.len() >= self.capacity() { let res = ready!(self.r.poll_read(cx, dst)); self.discard_buffer(); - return Poll::Ready(res); + return match res { + Ok(nread) => { + self.cur += nread as u64; + Poll::Ready(Ok(nread)) + } + Err(err) => Poll::Ready(Err(err)), + }; } let rem = ready!(self.poll_fill_buf(cx))?; @@ -252,7 +258,13 @@ where if self.pos == self.filled && dst.len() >= self.capacity() { let res = self.r.read(dst); self.discard_buffer(); - return res; + return match res { + Ok(nread) => { + self.cur += nread as u64; + Ok(nread) + } + Err(err) => Err(err), + }; } let rem = self.fill_buf()?; @@ -552,6 +564,33 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_bypass_read_and_seek_relative() -> anyhow::Result<()> { + let bs = Bytes::copy_from_slice( + &b"Hello, World! I'm going to tests a seek relative related bug!"[..], + ); + let acc = Arc::new(MockReadService::new(bs.clone())); + let r = Box::new(RangeReader::new( + acc, + "x", + OpRead::default().with_range(BytesRange::from(..)), + )) as oio::Reader; + let mut r = Box::new(BufferReader::new(r, 5)) as oio::Reader; + + let mut cur = 0; + for _ in 0..3 { + let mut dst = [0u8; 6]; + let nread = r.read(&mut dst).await?; + assert_eq!(nread, 6); + cur += 6; + } + + let ret_cur = r.seek(SeekFrom::Current(6)).await?; + assert_eq!(cur + 6, ret_cur); + + Ok(()) + } + #[tokio::test] async fn test_read_part() -> anyhow::Result<()> { let (bs, _) = gen_bytes(); @@ -726,6 +765,28 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_blocking_bypass_read_and_seek_relative() -> anyhow::Result<()> { + let bs = Bytes::copy_from_slice( + &b"Hello, World! I'm going to tests a seek relative related bug!"[..], + ); + let r = Box::new(oio::Cursor::from(bs.clone())) as oio::BlockingReader; + let mut r = Box::new(BufferReader::new(r, 5)) as oio::BlockingReader; + + let mut cur = 0; + for _ in 0..3 { + let mut dst = [0u8; 6]; + let nread = r.read(&mut dst)?; + assert_eq!(nread, 6); + cur += 6; + } + + let ret_cur = r.seek(SeekFrom::Current(6))?; + assert_eq!(cur + 6, ret_cur); + + Ok(()) + } + #[tokio::test] async fn test_blocking_read_part() -> anyhow::Result<()> { use std::io::Read;