diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 66105b4d4a4..3a3d81f891a 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -133,6 +133,7 @@ pub struct MultipartUploadWriter { w: Arc, upload_id: Option>, + concurrent: usize, parts: Vec, cache: Option, futures: ConcurrentFutures, @@ -163,6 +164,7 @@ impl MultipartUploadWriter { w: Arc::new(inner), upload_id: None, + concurrent, parts: Vec::new(), cache: None, futures: ConcurrentFutures::new(1.max(concurrent)), @@ -275,7 +277,13 @@ where }))); } } - while let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { + while let Some(mut part) = ready!(self.futures.poll_next_unpin(cx)) + { + // Don't retry close if concurrent write failed. + // TODO: Remove this after addressed. + if self.concurrent > 1 { + part = part.map_err(|err| err.set_permanent()); + } self.parts.push(part?); } } diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index bbd81606b07..285f4818fe9 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -111,6 +111,7 @@ pub struct RangeWriter { next_offset: u64, buffer: Option, futures: ConcurrentFutures, + concurrent: usize, w: Arc, state: State, @@ -144,6 +145,7 @@ impl RangeWriter { buffer: None, location: None, next_offset: 0, + concurrent, } } @@ -220,7 +222,14 @@ impl oio::Write for RangeWriter { match self.location.clone() { Some(location) => { if !self.futures.is_empty() { - while let Some(result) = ready!(self.futures.poll_next_unpin(cx)) { + while let Some(mut result) = + ready!(self.futures.poll_next_unpin(cx)) + { + // Don't retry close if concurrent write failed. + // TODO: Remove this after addressed. + if self.concurrent > 1 { + result = result.map_err(|err| err.set_permanent()); + } result?; } }