Skip to content

Commit

Permalink
fix(services/hdfs): fix poll_close when retry (#4309)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo authored Mar 4, 2024
1 parent 3e0a3fd commit a510291
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct HdfsWriter<F> {
tmp_path: Option<String>,
f: Option<F>,
client: Arc<hdrs::Client>,
fut: Option<BoxFuture<'static, Result<()>>>,
fut: Option<BoxFuture<'static, (F, Result<()>)>>,
}

/// # Safety
Expand Down Expand Up @@ -76,7 +76,11 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
if let Some(fut) = self.fut.as_mut() {
let res = ready!(fut.poll_unpin(cx));
self.fut = None;
return Poll::Ready(res);
if let Err(e) = res.1 {
self.f = Some(res.0);
return Poll::Ready(Err(e));
}
return Poll::Ready(Ok(()));
}

let mut f = self.f.take().expect("HdfsWriter must be initialized");
Expand All @@ -86,15 +90,21 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
let client = self.client.clone();

self.fut = Some(Box::pin(async move {
f.close().await.map_err(new_std_io_error)?;
if let Err(e) = f.close().await.map_err(new_std_io_error) {
// Reserve the original file handle for retry.
return (f, Err(e));
}

if let Some(tmp_path) = tmp_path {
client
if let Err(e) = client
.rename_file(&tmp_path, &target_path)
.map_err(new_std_io_error)?;
.map_err(new_std_io_error)
{
return (f, Err(e));
}
}

Ok(())
(f, Ok(()))
}));
}
}
Expand Down

0 comments on commit a510291

Please sign in to comment.