From c611f34bb2b311bcdcabf9a7ed08c95018b7072a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 28 Sep 2024 13:15:37 +0800 Subject: [PATCH] Fix sftp read Signed-off-by: Xuanwo --- core/src/services/sftp/backend.rs | 2 +- core/src/services/sftp/reader.rs | 43 +++++++------------------------ 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 48b87e0493b..8e3c103f786 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -404,7 +404,7 @@ impl Access for SftpBackend { Ok(( RpRead::default(), - SftpReader::new(client, f, args.range().size().unwrap_or(u64::MAX) as _), + SftpReader::new(client, f, args.range().size()), )) } diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs index f007c3bba64..90a231472f2 100644 --- a/core/src/services/sftp/reader.rs +++ b/core/src/services/sftp/reader.rs @@ -30,17 +30,17 @@ pub struct SftpReader { file: File, chunk: usize, - size: usize, + size: Option, read: usize, buf: BytesMut, } impl SftpReader { - pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: usize) -> Self { + pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option) -> Self { Self { _conn: conn, file, - size, + size: size.map(|v| v as usize), chunk: 2 * 1024 * 1024, read: 0, buf: BytesMut::new(), @@ -50,35 +50,15 @@ impl SftpReader { impl oio::Read for SftpReader { async fn read(&mut self) -> Result { - // let client = self.inner.connect().await?; - // - // let mut fs = client.fs(); - // fs.set_cwd(&self.root); - // - // let path = fs - // .canonicalize(&self.path) - // .await - // .map_err(parse_sftp_error)?; - // - // let mut f = client - // .open(path.as_path()) - // .await - // .map_err(parse_sftp_error)?; - - // f.seek(SeekFrom::Start(offset)) - // .await - // .map_err(new_std_io_error)?; - - // let mut size = size; - // if size == 0 { - // return Ok(Buffer::new()); - // } - - if self.read >= self.size { + if Some(self.read) >= self.size { return Ok(Buffer::new()); } - let size = (self.size - self.read).min(self.chunk); + let size = if let Some(size) = self.size { + (size - self.read).min(self.chunk) + } else { + self.chunk + }; self.buf.reserve(size); let Some(bytes) = self @@ -87,10 +67,7 @@ impl oio::Read for SftpReader { .await .map_err(parse_sftp_error)? else { - return Err(Error::new( - ErrorKind::RangeNotSatisfied, - "sftp read file reaching EoF", - )); + return Ok(Buffer::new()); }; self.read += bytes.len();