Skip to content

Commit

Permalink
feat: adapt the CompleteReader (#3861)
Browse files Browse the repository at this point in the history
* feat: add `TwoWaysReader` and `FourWaysReader`

* refactor: refactor `CompleteReader`
  • Loading branch information
WenyXu authored Dec 30, 2023
1 parent ad8c51a commit 0402a2c
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 99 deletions.
138 changes: 39 additions & 99 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
use std::cmp;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io;
use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;

use crate::raw::oio::BufferReader;
use crate::raw::oio::FileReader;
use crate::raw::oio::FlatLister;
use crate::raw::oio::FourWaysReader;
use crate::raw::oio::LazyReader;
use crate::raw::oio::PrefixLister;
use crate::raw::oio::RangeReader;
use crate::raw::oio::StreamableReader;
use crate::raw::oio::TwoWaysReader;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -293,28 +294,35 @@ impl<A: Accessor> CompleteAccessor<A> {

let seekable = capability.read_can_seek;
let streamable = capability.read_can_next;
let buffer_cap = args.buffer();

match (seekable, streamable) {
let r = match (seekable, streamable) {
(true, true) => {
let r = LazyReader::new(self.inner.clone(), path, args);
Ok((RpRead::new(), CompleteReader::AlreadyComplete(r)))
InnerCompleteReader::One(r)
}
(true, false) => {
let r = FileReader::new(self.inner.clone(), path, args);

Ok((RpRead::new(), CompleteReader::NeedStreamable(r)))
InnerCompleteReader::Two(r)
}
_ => {
let r = RangeReader::new(self.inner.clone(), path, args);

if streamable {
Ok((RpRead::new(), CompleteReader::NeedSeekable(r)))
InnerCompleteReader::Three(r)
} else {
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((RpRead::new(), CompleteReader::NeedBoth(r)))
InnerCompleteReader::Four(r)
}
}
}
};

let r = match buffer_cap {
None => CompleteReader::One(r),
Some(cap) => CompleteReader::Two(BufferReader::new(r, cap)),
};

Ok((RpRead::new(), r))
}

fn complete_blocking_read(
Expand All @@ -329,27 +337,35 @@ impl<A: Accessor> CompleteAccessor<A> {

let seekable = capability.read_can_seek;
let streamable = capability.read_can_next;
let buffer_cap = args.buffer();

match (seekable, streamable) {
let r = match (seekable, streamable) {
(true, true) => {
let r = LazyReader::new(self.inner.clone(), path, args);
Ok((RpRead::new(), CompleteReader::AlreadyComplete(r)))
InnerCompleteReader::One(r)
}
(true, false) => {
let r = FileReader::new(self.inner.clone(), path, args);
Ok((RpRead::new(), CompleteReader::NeedStreamable(r)))
InnerCompleteReader::Two(r)
}
_ => {
let r = RangeReader::new(self.inner.clone(), path, args);

if streamable {
Ok((RpRead::new(), CompleteReader::NeedSeekable(r)))
InnerCompleteReader::Three(r)
} else {
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((RpRead::new(), CompleteReader::NeedBoth(r)))
InnerCompleteReader::Four(r)
}
}
}
};

let r = match buffer_cap {
None => CompleteReader::One(r),
Some(cap) => CompleteReader::Two(BufferReader::new(r, cap)),
};

Ok((RpRead::new(), r))
}

async fn complete_list(
Expand Down Expand Up @@ -659,91 +675,15 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> {
}
}

pub enum CompleteReader<A: Accessor, R> {
AlreadyComplete(LazyReader<A, R>),
NeedSeekable(RangeReader<A, R>),
NeedStreamable(FileReader<A, R>),
NeedBoth(StreamableReader<RangeReader<A, R>>),
}

impl<A, R> oio::Read for CompleteReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
#[inline]
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.poll_read(cx, buf),
NeedSeekable(r) => r.poll_read(cx, buf),
NeedStreamable(r) => r.poll_read(cx, buf),
NeedBoth(r) => r.poll_read(cx, buf),
}
}

fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll<Result<u64>> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.poll_seek(cx, pos),
NeedSeekable(r) => r.poll_seek(cx, pos),
NeedStreamable(r) => r.poll_seek(cx, pos),
NeedBoth(r) => r.poll_seek(cx, pos),
}
}
pub type CompleteReader<A, R> =
TwoWaysReader<InnerCompleteReader<A, R>, BufferReader<InnerCompleteReader<A, R>>>;

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.poll_next(cx),
NeedSeekable(r) => r.poll_next(cx),
NeedStreamable(r) => r.poll_next(cx),
NeedBoth(r) => r.poll_next(cx),
}
}
}

impl<A, R> oio::BlockingRead for CompleteReader<A, R>
where
A: Accessor<BlockingReader = R>,
R: oio::BlockingRead,
{
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.read(buf),
NeedSeekable(r) => r.read(buf),
NeedStreamable(r) => r.read(buf),
NeedBoth(r) => r.read(buf),
}
}

fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.seek(pos),
NeedSeekable(r) => r.seek(pos),
NeedStreamable(r) => r.seek(pos),
NeedBoth(r) => r.seek(pos),
}
}

fn next(&mut self) -> Option<Result<Bytes>> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.next(),
NeedSeekable(r) => r.next(),
NeedStreamable(r) => r.next(),
NeedBoth(r) => r.next(),
}
}
}
type InnerCompleteReader<A, R> = FourWaysReader<
LazyReader<A, R>,
FileReader<A, R>,
RangeReader<A, R>,
StreamableReader<RangeReader<A, R>>,
>;

pub enum CompleteLister<A: Accessor, P> {
AlreadyComplete(P),
Expand Down
159 changes: 159 additions & 0 deletions core/src/raw/oio/read/compose_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{
io::SeekFrom,
task::{Context, Poll},
};

use crate::raw::*;

/// TwoWaysReader is used to implement [`Read`] based on two ways.
///
/// Users can wrap two different readers together.
pub enum TwoWaysReader<ONE, TWO> {
/// The first type for the [`TwoWaysReader`].
One(ONE),
/// The second type for the [`TwoWaysReader`].
Two(TWO),
}

impl<ONE: oio::Read, TWO: oio::Read> oio::Read for TwoWaysReader<ONE, TWO> {
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<crate::Result<usize>> {
match self {
Self::One(one) => one.poll_read(cx, buf),
Self::Two(two) => two.poll_read(cx, buf),
}
}

fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<crate::Result<u64>> {
match self {
Self::One(one) => one.poll_seek(cx, pos),
Self::Two(two) => two.poll_seek(cx, pos),
}
}

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<bytes::Bytes>>> {
match self {
Self::One(one) => one.poll_next(cx),
Self::Two(two) => two.poll_next(cx),
}
}
}

impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> oio::BlockingRead for TwoWaysReader<ONE, TWO> {
fn read(&mut self, buf: &mut [u8]) -> crate::Result<usize> {
match self {
Self::One(one) => one.read(buf),
Self::Two(two) => two.read(buf),
}
}

fn seek(&mut self, pos: SeekFrom) -> crate::Result<u64> {
match self {
Self::One(one) => one.seek(pos),
Self::Two(two) => two.seek(pos),
}
}

fn next(&mut self) -> Option<crate::Result<bytes::Bytes>> {
match self {
Self::One(one) => one.next(),
Self::Two(two) => two.next(),
}
}
}

/// FourWaysReader is used to implement [`Read`] based on four ways.
///
/// Users can wrap four different readers together.
pub enum FourWaysReader<ONE, TWO, THREE, FOUR> {
/// The first type for the [`TwoWaysReader`].
One(ONE),
/// The second type for the [`TwoWaysReader`].
Two(TWO),
/// The third type for the [`TwoWaysReader`].
Three(THREE),
/// The fourth type for the [`TwoWaysReader`].
Four(FOUR),
}

impl<ONE: oio::Read, TWO: oio::Read, THREE: oio::Read, FOUR: oio::Read> oio::Read
for FourWaysReader<ONE, TWO, THREE, FOUR>
{
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<crate::Result<usize>> {
match self {
Self::One(one) => one.poll_read(cx, buf),
Self::Two(two) => two.poll_read(cx, buf),
Self::Three(three) => three.poll_read(cx, buf),
Self::Four(four) => four.poll_read(cx, buf),
}
}

fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<crate::Result<u64>> {
match self {
Self::One(one) => one.poll_seek(cx, pos),
Self::Two(two) => two.poll_seek(cx, pos),
Self::Three(three) => three.poll_seek(cx, pos),
Self::Four(four) => four.poll_seek(cx, pos),
}
}

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<bytes::Bytes>>> {
match self {
Self::One(one) => one.poll_next(cx),
Self::Two(two) => two.poll_next(cx),
Self::Three(three) => three.poll_next(cx),
Self::Four(four) => four.poll_next(cx),
}
}
}

impl<
ONE: oio::BlockingRead,
TWO: oio::BlockingRead,
THREE: oio::BlockingRead,
FOUR: oio::BlockingRead,
> oio::BlockingRead for FourWaysReader<ONE, TWO, THREE, FOUR>
{
fn read(&mut self, buf: &mut [u8]) -> crate::Result<usize> {
match self {
Self::One(one) => one.read(buf),
Self::Two(two) => two.read(buf),
Self::Three(three) => three.read(buf),
Self::Four(four) => four.read(buf),
}
}

fn seek(&mut self, pos: SeekFrom) -> crate::Result<u64> {
match self {
Self::One(one) => one.seek(pos),
Self::Two(two) => two.seek(pos),
Self::Three(three) => three.seek(pos),
Self::Four(four) => four.seek(pos),
}
}

fn next(&mut self) -> Option<crate::Result<bytes::Bytes>> {
match self {
Self::One(one) => one.next(),
Self::Two(two) => two.next(),
Self::Three(three) => three.next(),
Self::Four(four) => four.next(),
}
}
}
4 changes: 4 additions & 0 deletions core/src/raw/oio/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ pub use lazy_read::LazyReader;

mod buffer_reader;
pub use buffer_reader::BufferReader;

mod compose_read;
pub use compose_read::FourWaysReader;
pub use compose_read::TwoWaysReader;

0 comments on commit 0402a2c

Please sign in to comment.