Skip to content

Commit

Permalink
refactor(services/ftp): Impl parse_error instead of From<Error> (#3891)
Browse files Browse the repository at this point in the history
* refactor(services/ftp):Don't impl From<XxxError> for Error

Signed-off-by: bokket <[email protected]>

* fix ftp error [505]

Signed-off-by: bokket <[email protected]>

---------

Signed-off-by: bokket <[email protected]>
  • Loading branch information
bokket committed Jan 1, 2024
1 parent 5ec1dce commit e925b4b
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 39 deletions.
42 changes: 28 additions & 14 deletions core/src/services/ftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use suppaftp::ImplAsyncFtpStream;
use suppaftp::Status;
use tokio::sync::OnceCell;

use super::err::parse_error;
use super::lister::FtpLister;
use super::util::FtpReader;
use super::writer::FtpWriter;
Expand Down Expand Up @@ -334,7 +335,7 @@ impl Accessor for FtpBackend {
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
return Err(parse_error(e));
}
}
}
Expand All @@ -351,24 +352,35 @@ impl Accessor for FtpBackend {
let br = args.range();
let r: Box<dyn AsyncRead + Send + Unpin> = match (br.offset(), br.size()) {
(Some(offset), Some(size)) => {
ftp_stream.resume_transfer(offset as usize).await?;
let ds = ftp_stream.retr_as_stream(path).await?.take(size);
ftp_stream
.resume_transfer(offset as usize)
.await
.map_err(parse_error)?;
let ds = ftp_stream
.retr_as_stream(path)
.await
.map_err(parse_error)?
.take(size);
Box::new(ds)
}
(Some(offset), None) => {
ftp_stream.resume_transfer(offset as usize).await?;
let ds = ftp_stream.retr_as_stream(path).await?;
ftp_stream
.resume_transfer(offset as usize)
.await
.map_err(parse_error)?;
let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?;
Box::new(ds)
}
(None, Some(size)) => {
ftp_stream
.resume_transfer((meta.size() as u64 - size) as usize)
.await?;
let ds = ftp_stream.retr_as_stream(path).await?;
.await
.map_err(parse_error)?;
let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?;
Box::new(ds)
}
(None, None) => {
let ds = ftp_stream.retr_as_stream(path).await?;
let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?;
Box::new(ds)
}
};
Expand All @@ -384,6 +396,7 @@ impl Accessor for FtpBackend {
// TODO: we can optimize this by checking dir existence first.
let mut ftp_stream = self.ftp_connect(Operation::Write).await?;
let mut curr_path = String::new();

for path in paths {
curr_path.push_str(path);
match ftp_stream.mkdir(&curr_path).await {
Expand All @@ -394,7 +407,7 @@ impl Accessor for FtpBackend {
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
return Err(parse_error(e));
}
}
}
Expand Down Expand Up @@ -439,7 +452,7 @@ impl Accessor for FtpBackend {
}))
| Ok(_) => (),
Err(e) => {
return Err(e.into());
return Err(parse_error(e));
}
}

Expand All @@ -450,7 +463,7 @@ impl Accessor for FtpBackend {
let mut ftp_stream = self.ftp_connect(Operation::List).await?;

let pathname = if path == "/" { None } else { Some(path) };
let files = ftp_stream.list(pathname).await?;
let files = ftp_stream.list(pathname).await.map_err(parse_error)?;

Ok((
RpList::default(),
Expand All @@ -475,10 +488,11 @@ impl FtpBackend {
})
.await
})
.await?;
.await
.map_err(parse_error)?;

pool.get_owned().await.map_err(|err| match err {
RunError::User(err) => err.into(),
RunError::User(err) => parse_error(err),
RunError::TimedOut => {
Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
}
Expand All @@ -492,7 +506,7 @@ impl FtpBackend {

let pathname = if parent == "/" { None } else { Some(parent) };

let resp = ftp_stream.list(pathname).await?;
let resp = ftp_stream.list(pathname).await.map_err(parse_error)?;

// Get stat of file.
let mut files = resp
Expand Down
42 changes: 20 additions & 22 deletions core/src/services/ftp/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,27 @@ use suppaftp::Status;
use crate::Error;
use crate::ErrorKind;

impl From<FtpError> for Error {
fn from(e: FtpError) -> Self {
let (kind, retryable) = match e {
// Allow retry for error
//
// `{ status: NotAvailable, body: "421 There are too many connections from your internet address." }`
FtpError::UnexpectedResponse(ref resp) if resp.status == Status::NotAvailable => {
(ErrorKind::Unexpected, true)
}
FtpError::UnexpectedResponse(ref resp) if resp.status == Status::FileUnavailable => {
(ErrorKind::NotFound, false)
}
// Allow retry bad response.
FtpError::BadResponse => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

let mut err = Error::new(kind, "ftp error").set_source(e);

if retryable {
err = err.set_temporary();
pub fn parse_error(err: FtpError) -> Error {
let (kind, retryable) = match err {
// Allow retry for error
//
// `{ status: NotAvailable, body: "421 There are too many connections from your internet address." }`
FtpError::UnexpectedResponse(ref resp) if resp.status == Status::NotAvailable => {
(ErrorKind::Unexpected, true)
}
FtpError::UnexpectedResponse(ref resp) if resp.status == Status::FileUnavailable => {
(ErrorKind::NotFound, false)
}
// Allow retry bad response.
FtpError::BadResponse => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

err
let mut err = Error::new(kind, "ftp error").set_source(err);

if retryable {
err = err.set_temporary();
}

err
}
4 changes: 3 additions & 1 deletion core/src/services/ftp/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use suppaftp::Status;

use super::backend::Manager;
use crate::raw::*;
use crate::services::ftp::err::parse_error;
use crate::*;

/// Wrapper for ftp data stream and command stream.
Expand Down Expand Up @@ -75,7 +76,8 @@ impl oio::Read for FtpReader {
Status::ClosingDataConnection,
Status::RequestedFileActionOk,
])
.await?;
.await
.map_err(parse_error)?;

Ok(())
};
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/ftp/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use futures::AsyncWriteExt;
use super::backend::FtpBackend;
use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::services::ftp::err::parse_error;
use crate::*;

pub type FtpWriters = oio::OneShotWriter<FtpWriter>;
Expand Down Expand Up @@ -53,12 +54,18 @@ impl oio::OneShotWrite for FtpWriter {
let bs = bs.bytes(size);

let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
let mut data_stream = ftp_stream
.append_with_stream(&self.path)
.await
.map_err(parse_error)?;
data_stream.write_all(&bs).await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err)
})?;

ftp_stream.finalize_put_stream(data_stream).await?;
ftp_stream
.finalize_put_stream(data_stream)
.await
.map_err(parse_error)?;
Ok(())
}
}

0 comments on commit e925b4b

Please sign in to comment.