Skip to content

Commit

Permalink
Fix sftp read
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Sep 28, 2024
1 parent 93076ee commit c611f34
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 34 deletions.
2 changes: 1 addition & 1 deletion core/src/services/sftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl Access for SftpBackend {

Ok((
RpRead::default(),
SftpReader::new(client, f, args.range().size().unwrap_or(u64::MAX) as _),
SftpReader::new(client, f, args.range().size()),
))
}

Expand Down
43 changes: 10 additions & 33 deletions core/src/services/sftp/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ pub struct SftpReader {

file: File,
chunk: usize,
size: usize,
size: Option<usize>,
read: usize,
buf: BytesMut,
}

impl SftpReader {
pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: usize) -> Self {
pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option<u64>) -> Self {
Self {
_conn: conn,
file,
size,
size: size.map(|v| v as usize),
chunk: 2 * 1024 * 1024,
read: 0,
buf: BytesMut::new(),
Expand All @@ -50,35 +50,15 @@ impl SftpReader {

impl oio::Read for SftpReader {
async fn read(&mut self) -> Result<Buffer> {
// let client = self.inner.connect().await?;
//
// let mut fs = client.fs();
// fs.set_cwd(&self.root);
//
// let path = fs
// .canonicalize(&self.path)
// .await
// .map_err(parse_sftp_error)?;
//
// let mut f = client
// .open(path.as_path())
// .await
// .map_err(parse_sftp_error)?;

// f.seek(SeekFrom::Start(offset))
// .await
// .map_err(new_std_io_error)?;

// let mut size = size;
// if size == 0 {
// return Ok(Buffer::new());
// }

if self.read >= self.size {
if Some(self.read) >= self.size {
return Ok(Buffer::new());
}

let size = (self.size - self.read).min(self.chunk);
let size = if let Some(size) = self.size {
(size - self.read).min(self.chunk)
} else {
self.chunk
};
self.buf.reserve(size);

let Some(bytes) = self
Expand All @@ -87,10 +67,7 @@ impl oio::Read for SftpReader {
.await
.map_err(parse_sftp_error)?
else {
return Err(Error::new(
ErrorKind::RangeNotSatisfied,
"sftp read file reaching EoF",
));
return Ok(Buffer::new());
};

self.read += bytes.len();
Expand Down

0 comments on commit c611f34

Please sign in to comment.