Skip to content

Commit

Permalink
fix: TimeoutLayer doesn't work as expected for writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhenglin Li committed Mar 6, 2024
1 parent 453fea6 commit 227ea1a
Showing 1 changed file with 54 additions and 12 deletions.
66 changes: 54 additions & 12 deletions core/src/layers/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,27 +335,69 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {

impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
self.poll_timeout(cx, WriteOperation::Write.into_static())?;
let sleep = self
.sleep
.get_or_insert_with(|| Box::pin(tokio::time::sleep(self.timeout)));

let v = ready!(self.inner.poll_write(cx, bs));
self.sleep = None;
Poll::Ready(v)
tokio::select! {
_ = sleep.as_mut() => {
self.sleep = None;
Ready(Err(
Error::new(ErrorKind::Unexpected, "io operation timeout reached")
.with_operation(WriteOperation::Write.into_static())
.with_context("io_timeout", self.timeout.as_secs_f64().to_string())
.set_temporary(),
))
}
result = self.inner.poll_write(cx, bs) => {
self.sleep = None;
Ready(result)
}
}
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.poll_timeout(cx, WriteOperation::Close.into_static())?;
let sleep = self
.sleep
.get_or_insert_with(|| Box::pin(tokio::time::sleep(self.timeout)));

let v = ready!(self.inner.poll_close(cx));
self.sleep = None;
Poll::Ready(v)
tokio::select! {
_ = sleep.as_mut() => {
self.sleep = None;
Ready(Err(
Error::new(ErrorKind::Unexpected, "io operation timeout reached")
.with_operation(WriteOperation::Write.into_static())
.with_context("io_timeout", self.timeout.as_secs_f64().to_string())
.set_temporary(),
))
}
result = self.inner.poll_close(cx) => {
self.sleep = None;
Ready(result)
}
}
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.poll_timeout(cx, WriteOperation::Abort.into_static())?;
let sleep = self
.sleep
.get_or_insert_with(|| Box::pin(tokio::time::sleep(self.timeout)));

let v = ready!(self.inner.poll_abort(cx));
self.sleep = None;
Poll::Ready(v)
tokio::select! {
_ = sleep.as_mut() => {
self.sleep = None;
Ready(Err(
Error::new(ErrorKind::Unexpected, "io operation timeout reached")
.with_operation(WriteOperation::Write.into_static())
.with_context("io_timeout", self.timeout.as_secs_f64().to_string())
.set_temporary(),
))
}
result = self.inner.poll_abort(cx) => {
self.sleep = None;
Ready(result)
}
}
}
}

Expand Down

0 comments on commit 227ea1a

Please sign in to comment.