diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index b5ddda58126..9888d35727b 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -42,6 +42,7 @@ use suppaftp::ImplAsyncFtpStream; use suppaftp::Status; use tokio::sync::OnceCell; +use super::err::parse_error; use super::lister::FtpLister; use super::util::FtpReader; use super::writer::FtpWriter; @@ -334,7 +335,7 @@ impl Accessor for FtpBackend { })) | Ok(()) => (), Err(e) => { - return Err(e.into()); + return Err(parse_error(e)); } } } @@ -351,24 +352,35 @@ impl Accessor for FtpBackend { let br = args.range(); let r: Box = match (br.offset(), br.size()) { (Some(offset), Some(size)) => { - ftp_stream.resume_transfer(offset as usize).await?; - let ds = ftp_stream.retr_as_stream(path).await?.take(size); + ftp_stream + .resume_transfer(offset as usize) + .await + .map_err(parse_error)?; + let ds = ftp_stream + .retr_as_stream(path) + .await + .map_err(parse_error)? + .take(size); Box::new(ds) } (Some(offset), None) => { - ftp_stream.resume_transfer(offset as usize).await?; - let ds = ftp_stream.retr_as_stream(path).await?; + ftp_stream + .resume_transfer(offset as usize) + .await + .map_err(parse_error)?; + let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?; Box::new(ds) } (None, Some(size)) => { ftp_stream .resume_transfer((meta.size() as u64 - size) as usize) - .await?; - let ds = ftp_stream.retr_as_stream(path).await?; + .await + .map_err(parse_error)?; + let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?; Box::new(ds) } (None, None) => { - let ds = ftp_stream.retr_as_stream(path).await?; + let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?; Box::new(ds) } }; @@ -384,6 +396,7 @@ impl Accessor for FtpBackend { // TODO: we can optimize this by checking dir existence first. let mut ftp_stream = self.ftp_connect(Operation::Write).await?; let mut curr_path = String::new(); + for path in paths { curr_path.push_str(path); match ftp_stream.mkdir(&curr_path).await { @@ -394,7 +407,7 @@ impl Accessor for FtpBackend { })) | Ok(()) => (), Err(e) => { - return Err(e.into()); + return Err(parse_error(e)); } } } @@ -439,7 +452,7 @@ impl Accessor for FtpBackend { })) | Ok(_) => (), Err(e) => { - return Err(e.into()); + return Err(parse_error(e)); } } @@ -450,7 +463,7 @@ impl Accessor for FtpBackend { let mut ftp_stream = self.ftp_connect(Operation::List).await?; let pathname = if path == "/" { None } else { Some(path) }; - let files = ftp_stream.list(pathname).await?; + let files = ftp_stream.list(pathname).await.map_err(parse_error)?; Ok(( RpList::default(), @@ -475,10 +488,11 @@ impl FtpBackend { }) .await }) - .await?; + .await + .map_err(parse_error)?; pool.get_owned().await.map_err(|err| match err { - RunError::User(err) => err.into(), + RunError::User(err) => parse_error(err), RunError::TimedOut => { Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() } @@ -492,7 +506,7 @@ impl FtpBackend { let pathname = if parent == "/" { None } else { Some(parent) }; - let resp = ftp_stream.list(pathname).await?; + let resp = ftp_stream.list(pathname).await.map_err(parse_error)?; // Get stat of file. let mut files = resp diff --git a/core/src/services/ftp/err.rs b/core/src/services/ftp/err.rs index 4511ec6a61a..e9eba974f98 100644 --- a/core/src/services/ftp/err.rs +++ b/core/src/services/ftp/err.rs @@ -21,29 +21,27 @@ use suppaftp::Status; use crate::Error; use crate::ErrorKind; -impl From for Error { - fn from(e: FtpError) -> Self { - let (kind, retryable) = match e { - // Allow retry for error - // - // `{ status: NotAvailable, body: "421 There are too many connections from your internet address." }` - FtpError::UnexpectedResponse(ref resp) if resp.status == Status::NotAvailable => { - (ErrorKind::Unexpected, true) - } - FtpError::UnexpectedResponse(ref resp) if resp.status == Status::FileUnavailable => { - (ErrorKind::NotFound, false) - } - // Allow retry bad response. - FtpError::BadResponse => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, false), - }; - - let mut err = Error::new(kind, "ftp error").set_source(e); - - if retryable { - err = err.set_temporary(); +pub fn parse_error(err: FtpError) -> Error { + let (kind, retryable) = match err { + // Allow retry for error + // + // `{ status: NotAvailable, body: "421 There are too many connections from your internet address." }` + FtpError::UnexpectedResponse(ref resp) if resp.status == Status::NotAvailable => { + (ErrorKind::Unexpected, true) + } + FtpError::UnexpectedResponse(ref resp) if resp.status == Status::FileUnavailable => { + (ErrorKind::NotFound, false) } + // Allow retry bad response. + FtpError::BadResponse => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; - err + let mut err = Error::new(kind, "ftp error").set_source(err); + + if retryable { + err = err.set_temporary(); } + + err } diff --git a/core/src/services/ftp/util.rs b/core/src/services/ftp/util.rs index 82d250e13ca..15d3ced8047 100644 --- a/core/src/services/ftp/util.rs +++ b/core/src/services/ftp/util.rs @@ -29,6 +29,7 @@ use suppaftp::Status; use super::backend::Manager; use crate::raw::*; +use crate::services::ftp::err::parse_error; use crate::*; /// Wrapper for ftp data stream and command stream. @@ -75,7 +76,8 @@ impl oio::Read for FtpReader { Status::ClosingDataConnection, Status::RequestedFileActionOk, ]) - .await?; + .await + .map_err(parse_error)?; Ok(()) }; diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 79ae8e55d17..37ce8dd9eb1 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -21,6 +21,7 @@ use futures::AsyncWriteExt; use super::backend::FtpBackend; use crate::raw::oio::WriteBuf; use crate::raw::*; +use crate::services::ftp::err::parse_error; use crate::*; pub type FtpWriters = oio::OneShotWriter; @@ -53,12 +54,18 @@ impl oio::OneShotWrite for FtpWriter { let bs = bs.bytes(size); let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?; - let mut data_stream = ftp_stream.append_with_stream(&self.path).await?; + let mut data_stream = ftp_stream + .append_with_stream(&self.path) + .await + .map_err(parse_error)?; data_stream.write_all(&bs).await.map_err(|err| { Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) })?; - ftp_stream.finalize_put_stream(data_stream).await?; + ftp_stream + .finalize_put_stream(data_stream) + .await + .map_err(parse_error)?; Ok(()) } }