Skip to content

Commit

Permalink
feat: Reduce stat request if we are reading to end
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Sep 28, 2024
1 parent 43e4d7d commit 93076ee
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 126 deletions.
10 changes: 5 additions & 5 deletions core/src/types/blocking_read/buffer_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;
use std::ops::RangeBounds;
use std::sync::Arc;

use crate::raw::*;
Expand All @@ -30,8 +30,8 @@ struct IteratingReader {
impl IteratingReader {
/// Create a new iterating reader.
#[inline]
fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
let generator = ReadGenerator::new(ctx.clone(), range);
fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size());
Self {
generator,
reader: None,
Expand Down Expand Up @@ -73,9 +73,9 @@ pub struct BufferIterator {
impl BufferIterator {
/// Create a new buffer iterator.
#[inline]
pub fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
pub fn new(ctx: Arc<ReadContext>, range: impl RangeBounds<u64>) -> Self {
Self {
inner: IteratingReader::new(ctx, range),
inner: IteratingReader::new(ctx, range.into()),
}
}
}
Expand Down
139 changes: 100 additions & 39 deletions core/src/types/context/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;
use std::ops::{Bound, Range, RangeBounds};
use std::sync::Arc;

use crate::raw::*;
Expand Down Expand Up @@ -68,6 +68,38 @@ impl ReadContext {
pub fn options(&self) -> &OpReader {
&self.options
}

/// Parse the range bounds into a range.
pub(crate) async fn parse_into_range(
&self,
range: impl RangeBounds<u64>,
) -> Result<Range<u64>> {
let start = match range.start_bound() {
Bound::Included(v) => *v,
Bound::Excluded(v) => v + 1,
Bound::Unbounded => 0,
};

let end = match range.end_bound() {
Bound::Included(v) => v + 1,
Bound::Excluded(v) => *v,
Bound::Unbounded => {
let mut op_stat = OpStat::new();

if let Some(v) = self.args().version() {
op_stat = op_stat.with_version(v);
}

self.accessor()
.stat(self.path(), op_stat)
.await?
.into_metadata()
.content_length()
}
};

Ok(start..end)
}
}

/// ReadGenerator is used to generate new readers.
Expand All @@ -83,62 +115,65 @@ pub struct ReadGenerator {
ctx: Arc<ReadContext>,

offset: u64,
end: u64,
size: Option<u64>,
}

impl ReadGenerator {
/// Create a new ReadGenerator.
#[inline]
pub fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
Self {
ctx,
offset: range.start,
end: range.end,
pub fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
Self { ctx, offset, size }
}

/// Generate next range to read.
fn next_range(&mut self) -> Option<BytesRange> {
if self.size == Some(0) {
return None;
}

let next_offset = self.offset;
let next_size = match self.size {
// Given size is None, read all data.
None => {
// Update size to Some(0) to indicate that there is no more data to read.
self.size = Some(0);
None
}
Some(remaining) => {
// If chunk is set, read data in chunks.
let read_size = self
.ctx
.options
.chunk()
.map_or(remaining, |chunk| remaining.min(chunk as u64));
// Update (offset, size) before building future.
self.offset += read_size;
self.size = Some(remaining - read_size);
Some(read_size)
}
};

Some(BytesRange::new(next_offset, next_size))
}

/// Generate next reader.
pub async fn next_reader(&mut self) -> Result<Option<oio::Reader>> {
if self.offset >= self.end {
let Some(range) = self.next_range() else {
return Ok(None);
}
};

let offset = self.offset;
let mut size = (self.end - self.offset) as usize;
if let Some(chunk) = self.ctx.options.chunk() {
size = size.min(chunk)
}

// Update self.offset before building future.
self.offset += size as u64;
let args = self
.ctx
.args
.clone()
.with_range(BytesRange::new(offset, Some(size as u64)));
let args = self.ctx.args.clone().with_range(range);
let (_, r) = self.ctx.acc.read(&self.ctx.path, args).await?;
Ok(Some(r))
}

/// Generate next blocking reader.
pub fn next_blocking_reader(&mut self) -> Result<Option<oio::BlockingReader>> {
if self.offset >= self.end {
let Some(range) = self.next_range() else {
return Ok(None);
}
};

let offset = self.offset;
let mut size = (self.end - self.offset) as usize;
if let Some(chunk) = self.ctx.options.chunk() {
size = size.min(chunk)
}

// Update self.offset before building future.
self.offset += size as u64;
let args = self
.ctx
.args
.clone()
.with_range(BytesRange::new(offset, Some(size as u64)));
let args = self.ctx.args.clone().with_range(range);
let (_, r) = self.ctx.acc.blocking_read(&self.ctx.path, args)?;
Ok(Some(r))
}
Expand Down Expand Up @@ -167,7 +202,7 @@ mod tests {
OpRead::new(),
OpReader::new().with_chunk(3),
));
let mut generator = ReadGenerator::new(ctx, 0..10);
let mut generator = ReadGenerator::new(ctx, 0, Some(10));
let mut readers = vec![];
while let Some(r) = generator.next_reader().await? {
readers.push(r);
Expand All @@ -177,6 +212,32 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_next_reader_without_size() -> Result<()> {
let op = Operator::via_iter(Scheme::Memory, [])?;
op.write(
"test",
Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
)
.await?;

let acc = op.into_inner();
let ctx = Arc::new(ReadContext::new(
acc,
"test".to_string(),
OpRead::new(),
OpReader::new().with_chunk(3),
));
let mut generator = ReadGenerator::new(ctx, 0, None);
let mut readers = vec![];
while let Some(r) = generator.next_reader().await? {
readers.push(r);
}

pretty_assertions::assert_eq!(readers.len(), 1);
Ok(())
}

#[test]
fn test_next_blocking_reader() -> Result<()> {
let op = Operator::via_iter(Scheme::Memory, [])?;
Expand All @@ -192,7 +253,7 @@ mod tests {
OpRead::new(),
OpReader::new().with_chunk(3),
));
let mut generator = ReadGenerator::new(ctx, 0..10);
let mut generator = ReadGenerator::new(ctx, 0, Some(10));
let mut readers = vec![];
while let Some(r) = generator.next_blocking_reader()? {
readers.push(r);
Expand Down
49 changes: 36 additions & 13 deletions core/src/types/read/buffer_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;
use std::ops::RangeBounds;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
Expand All @@ -40,8 +40,8 @@ pub struct StreamingReader {
impl StreamingReader {
/// Create a new streaming reader.
#[inline]
fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
let generator = ReadGenerator::new(ctx.clone(), range);
fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size());
Self {
generator,
reader: None,
Expand Down Expand Up @@ -86,7 +86,7 @@ impl ChunkedReader {
/// # Notes
///
/// We don't need to handle `Executor::timeout` since we are outside of the layer.
fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
let tasks = ConcurrentTasks::new(
ctx.args().executor().cloned().unwrap_or_default(),
ctx.options().concurrent(),
Expand All @@ -99,7 +99,7 @@ impl ChunkedReader {
})
},
);
let generator = ReadGenerator::new(ctx, range);
let generator = ReadGenerator::new(ctx, range.offset(), range.size());
Self {
generator,
tasks,
Expand Down Expand Up @@ -144,17 +144,40 @@ enum State {
}

impl BufferStream {
/// Create a new buffer stream.
pub fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
/// Create a new buffer stream with already calculated offset and size.
pub fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
debug_assert!(
size.is_some() || ctx.options().chunk().is_none(),
"size must be known if chunk is set"
);

let reader = if ctx.options().chunk().is_some() {
TwoWays::Two(ChunkedReader::new(ctx, range))
TwoWays::Two(ChunkedReader::new(ctx, BytesRange::new(offset, size)))
} else {
TwoWays::One(StreamingReader::new(ctx, range))
TwoWays::One(StreamingReader::new(ctx, BytesRange::new(offset, size)))
};

Self {
state: State::Idle(Some(reader)),
}
}

/// Create a new buffer stream with given range bound.
///
/// If users is going to perform chunked read but the read size is unknown, we will parse
/// into range first.
pub async fn create(ctx: Arc<ReadContext>, range: impl RangeBounds<u64>) -> Result<Self> {
let reader = if ctx.options().chunk().is_some() {
let range = ctx.parse_into_range(range).await?;
TwoWays::Two(ChunkedReader::new(ctx, range.into()))
} else {
TwoWays::One(StreamingReader::new(ctx, range.into()))
};

Ok(Self {
state: State::Idle(Some(reader)),
})
}
}

impl Stream for BufferStream {
Expand Down Expand Up @@ -198,16 +221,16 @@ mod tests {

use super::*;

#[test]
fn test_trait() -> Result<()> {
#[tokio::test]
async fn test_trait() -> Result<()> {
let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner();
let ctx = Arc::new(ReadContext::new(
acc,
"test".to_string(),
OpRead::new(),
OpReader::new(),
));
let v = BufferStream::new(ctx, 4..8);
let v = BufferStream::create(ctx, 4..8).await?;

let _: Box<dyn Unpin + MaybeSend + 'static> = Box::new(v);

Expand All @@ -231,7 +254,7 @@ mod tests {
OpReader::new(),
));

let s = BufferStream::new(ctx, 4..8);
let s = BufferStream::create(ctx, 4..8).await?;
let bufs: Vec<_> = s.try_collect().await.unwrap();
assert_eq!(bufs.len(), 1);
assert_eq!(bufs[0].chunk(), "o".as_bytes());
Expand Down
12 changes: 8 additions & 4 deletions core/src/types/read/futures_async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl FuturesAsyncReader {
#[inline]
pub(super) fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
let (start, end) = (range.start, range.end);
let stream = BufferStream::new(ctx.clone(), range);
let stream = BufferStream::new(ctx.clone(), start, Some(end - start));

FuturesAsyncReader {
ctx,
Expand Down Expand Up @@ -157,7 +157,11 @@ impl AsyncSeek for FuturesAsyncReader {
self.buf.advance(cnt as _);
} else {
self.buf = Buffer::new();
self.stream = BufferStream::new(self.ctx.clone(), new_pos + self.start..self.end);
self.stream = BufferStream::new(
self.ctx.clone(),
new_pos + self.start,
Some(self.end - self.start - new_pos),
);
}

self.pos = new_pos;
Expand All @@ -177,8 +181,8 @@ mod tests {

use super::*;

#[test]
fn test_trait() -> Result<()> {
#[tokio::test]
async fn test_trait() -> Result<()> {
let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner();
let ctx = Arc::new(ReadContext::new(
acc,
Expand Down
Loading

0 comments on commit 93076ee

Please sign in to comment.