Skip to content

Commit

Permalink
fix: Don't retry close if concurrent > 1 to avoid content lost (#3957)
Browse files Browse the repository at this point in the history
* Fix build

Signed-off-by: Xuanwo <[email protected]>

* Fix

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Jan 9, 2024
1 parent b956b76 commit 65973eb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
10 changes: 9 additions & 1 deletion core/src/raw/oio/write/multipart_upload_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
w: Arc<W>,

upload_id: Option<Arc<String>>,
concurrent: usize,
parts: Vec<MultipartUploadPart>,
cache: Option<oio::ChunkedBytes>,
futures: ConcurrentFutures<WritePartFuture>,
Expand Down Expand Up @@ -163,6 +164,7 @@ impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {

w: Arc::new(inner),
upload_id: None,
concurrent,
parts: Vec::new(),
cache: None,
futures: ConcurrentFutures::new(1.max(concurrent)),
Expand Down Expand Up @@ -275,7 +277,13 @@ where
})));
}
}
while let Some(part) = ready!(self.futures.poll_next_unpin(cx)) {
while let Some(mut part) = ready!(self.futures.poll_next_unpin(cx))
{
// Don't retry close if concurrent write failed.
// TODO: Remove this after <https://github.com/apache/incubator-opendal/issues/3956> addressed.
if self.concurrent > 1 {
part = part.map_err(|err| err.set_permanent());
}
self.parts.push(part?);
}
}
Expand Down
11 changes: 10 additions & 1 deletion core/src/raw/oio/write/range_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct RangeWriter<W: RangeWrite> {
next_offset: u64,
buffer: Option<oio::ChunkedBytes>,
futures: ConcurrentFutures<WriteRangeFuture>,
concurrent: usize,

w: Arc<W>,
state: State,
Expand Down Expand Up @@ -144,6 +145,7 @@ impl<W: RangeWrite> RangeWriter<W> {
buffer: None,
location: None,
next_offset: 0,
concurrent,
}
}

Expand Down Expand Up @@ -220,7 +222,14 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
match self.location.clone() {
Some(location) => {
if !self.futures.is_empty() {
while let Some(result) = ready!(self.futures.poll_next_unpin(cx)) {
while let Some(mut result) =
ready!(self.futures.poll_next_unpin(cx))
{
// Don't retry close if concurrent write failed.
// TODO: Remove this after <https://github.com/apache/incubator-opendal/issues/3956> addressed.
if self.concurrent > 1 {
result = result.map_err(|err| err.set_permanent());
}
result?;
}
}
Expand Down

0 comments on commit 65973eb

Please sign in to comment.