Skip to content

Commit

Permalink
feat(core): Allow retry in concurrent write operations (#3958)
Browse files Browse the repository at this point in the history
* Fix concurrent retry

Signed-off-by: Xuanwo <[email protected]>

* FIx retry for range write

Signed-off-by: Xuanwo <[email protected]>

* Fix typo

Signed-off-by: Xuanwo <[email protected]>

* Add test for retry in concurrent

Signed-off-by: Xuanwo <[email protected]>

* FIx

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Jan 10, 2024
1 parent 8714ad7 commit 8315b7e
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 109 deletions.
22 changes: 19 additions & 3 deletions core/src/raw/futures_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ where
self.remaining() > 0
}

/// Push new future into the queue.
pub fn push(&mut self, f: F) {
/// Push new future into the end of queue.
pub fn push_back(&mut self, f: F) {
debug_assert!(
self.has_remaining(),
"concurrent futures must have remaining space"
Expand All @@ -149,6 +149,22 @@ where
Tasks::Large(v) => v.push_back(f),
}
}

/// Push new future into the start of queue, this task will be exactly the next to poll.
pub fn push_front(&mut self, f: F) {
debug_assert!(
self.has_remaining(),
"concurrent futures must have remaining space"
);

match &mut self.tasks {
Tasks::Once(fut) => {
*fut = Some(f);
}
Tasks::Small(v) => v.push_front(TaskResult::Polling(f)),
Tasks::Large(v) => v.push_front(f),
}
}
}

impl<F> futures::Stream for ConcurrentFutures<F>
Expand Down Expand Up @@ -230,7 +246,7 @@ mod tests {
idx
};
self.idx += 1;
self.tasks.push(Box::pin(fut));
self.tasks.push_back(Box::pin(fut));
}

if let Some(v) = ready!(self.tasks.poll_next_unpin(cx)) {
Expand Down
26 changes: 14 additions & 12 deletions core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ where
self.block_ids.push(block_id.clone());
let w = self.w.clone();
let size = cache.len();
self.futures.push(WriteBlockFuture(Box::pin(async move {
w.write_block(size as u64, block_id, AsyncBody::ChunkedBytes(cache))
.await
})));
self.futures
.push_back(WriteBlockFuture(Box::pin(async move {
w.write_block(size as u64, block_id, AsyncBody::ChunkedBytes(cache))
.await
})));
let size = self.fill_cache(bs);
return Poll::Ready(Ok(size));
} else if let Some(res) = ready!(self.futures.poll_next_unpin(cx)) {
Expand Down Expand Up @@ -221,14 +222,15 @@ where
self.block_ids.push(block_id.clone());
let size = cache.len();
let w = self.w.clone();
self.futures.push(WriteBlockFuture(Box::pin(async move {
w.write_block(
size as u64,
block_id,
AsyncBody::ChunkedBytes(cache),
)
.await
})));
self.futures
.push_back(WriteBlockFuture(Box::pin(async move {
w.write_block(
size as u64,
block_id,
AsyncBody::ChunkedBytes(cache),
)
.await
})));
}
}
while let Some(res) = ready!(self.futures.poll_next_unpin(cx)) {
Expand Down
Loading

0 comments on commit 8315b7e

Please sign in to comment.