diff --git a/.github/workflows/ci-test.yml b/.github/workflows/ci-test.yml index b29095ec..39b9dfdc 100644 --- a/.github/workflows/ci-test.yml +++ b/.github/workflows/ci-test.yml @@ -98,3 +98,4 @@ jobs: RUST_LOG: "debug" RUST_BACKTRACE: "1" SKIP_CLIPPY: "1" + CLEAN_BEFORE_EACH: "1" diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bbbb2a3..a41b4142 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Change log +## v0.2.1 + +- `qiniu_upload_manager::MultiPartsV1Uploader` 总是使用 4 MB 分片大小,无论 `qiniu_upload_manager::DataPartitionProvider` 返回多大的分片大小。 +- `qiniu_upload_manager::SerialMultiPartsUploaderScheduler` 和 `qiniu_upload_manager::ConcurrentMultiPartsUploaderScheduler` 对空间所在区域上传对象失败后,会使用多活区域继续重试,直到其中有一个能成功为止。 + ## v0.2.0 - 大部分 Trait 现在都实现了 Clone,减少了泛型参数以方便被作为 Trait Object 使用 diff --git a/Makefile b/Makefile index ce836155..6d46aadd 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,9 @@ doc_test: test: set -e; \ for dir in $(SUBDIRS); do \ + if [ -n "${CLEAN_BEFORE_EACH}" ]; then \ + $(MAKE) clean; \ + fi; \ $(MAKE) -C $$dir test; \ done clean: diff --git a/api-generator/Cargo.toml b/api-generator/Cargo.toml index 7d28960a..0f159f98 100644 --- a/api-generator/Cargo.toml +++ b/api-generator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-api-generator" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -29,8 +29,8 @@ walkdir = "2.3.2" [dev-dependencies] serde_json = "1.0.68" -qiniu-http = { version = "0.2.0", path = "../http" } -qiniu-http-client = { version = "0.2.0", path = "../http-client" } -qiniu-upload-token = { version = "0.2.0", path = "../upload-token" } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-http = { version = "0.2.1", path = "../http" } +qiniu-http-client = { version = "0.2.1", path = "../http-client" } +qiniu-upload-token = { version = "0.2.1", path = "../upload-token" } +qiniu-utils = { version = "0.2.1", path = "../utils" } indexmap = "1.7.0" diff --git a/apis/Cargo.toml b/apis/Cargo.toml index e8d517ab..9543466e 100644 --- a/apis/Cargo.toml +++ b/apis/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-apis" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -22,8 +22,8 @@ indexmap = "1.7.0" futures = { version = "0.3.5", optional = true } async-std = { version = "1.9.0", optional = true } -qiniu-http-client = { version = "0.2.0", path = "../http-client", default-features = false } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-http-client = { version = "0.2.1", path = "../http-client", default-features = false } +qiniu-utils = { version = "0.2.1", path = "../utils" } [features] default = ["ureq"] diff --git a/apis/README.md b/apis/README.md index 2822b078..e3ae5ac3 100644 --- a/apis/README.md +++ b/apis/README.md @@ -24,21 +24,21 @@ ```toml [dependencies] -qiniu-apis = { version = "0.2.0", features = ["ureq"] } +qiniu-apis = { version = "0.2.1", features = ["ureq"] } ``` ### 启用 Isahc 异步接口 ```toml [dependencies] -qiniu-apis = { version = "0.2.0", features = ["async", "isahc"] } +qiniu-apis = { version = "0.2.1", features = ["async", "isahc"] } ``` ### 启用 Reqwest 异步接口 ```toml [dependencies] -qiniu-apis = { version = "0.2.0", features = ["async", "reqwest"] } +qiniu-apis = { version = "0.2.1", features = ["async", "reqwest"] } ``` ### 其他功能 diff --git a/credential/Cargo.toml b/credential/Cargo.toml index eff7eab3..9ca1a4ec 100644 --- a/credential/Cargo.toml +++ b/credential/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-credential" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -24,7 +24,7 @@ assert-impl = "0.1.3" auto_impl = "1.0.0" futures-lite = { version = "1.12.0", optional = true } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-utils = { version = "0.2.1", path = "../utils" } [dev-dependencies] anyhow = "1.0.41" diff --git a/credential/README.md b/credential/README.md index 479dbcc4..14d2e424 100644 --- a/credential/README.md +++ b/credential/README.md @@ -20,14 +20,14 @@ ```toml [dependencies] -qiniu-credential = "0.2.0" +qiniu-credential = "0.2.1" ``` ### 启用异步接口 ```toml [dependencies] -qiniu-credential = { version = "0.2.0", features = ["async"] } +qiniu-credential = { version = "0.2.1", features = ["async"] } ``` ## 代码示例 diff --git a/download-manager/Cargo.toml b/download-manager/Cargo.toml index 7fe94e78..fe95cc64 100644 --- a/download-manager/Cargo.toml +++ b/download-manager/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-download-manager" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -26,7 +26,7 @@ futures = { version = "0.3.5", optional = true } async-std = { version = "1.9.0", optional = true } smart-default = { version = "0.6.0", optional = true } -qiniu-apis = { version = "0.2.0", path = "../apis", default-features = false } +qiniu-apis = { version = "0.2.1", path = "../apis", default-features = false } [dev-dependencies] rand = "0.8.3" diff --git a/download-manager/README.md b/download-manager/README.md index 4d6e341a..efed0a44 100644 --- a/download-manager/README.md +++ b/download-manager/README.md @@ -16,21 +16,21 @@ ```toml [dependencies] -qiniu-download-manager = { version = "0.2.0", features = ["ureq"] } +qiniu-download-manager = { version = "0.2.1", features = ["ureq"] } ``` ### 启用 Isahc 异步接口 ```toml [dependencies] -qiniu-download-manager = { version = "0.2.0", features = ["async", "isahc"] } +qiniu-download-manager = { version = "0.2.1", features = ["async", "isahc"] } ``` ### 启用 Reqwest 异步接口 ```toml [dependencies] -qiniu-download-manager = { version = "0.2.0", features = ["async", "reqwest"] } +qiniu-download-manager = { version = "0.2.1", features = ["async", "reqwest"] } ``` ### 其他功能 diff --git a/etag/Cargo.toml b/etag/Cargo.toml index f4ca683a..92a16dd9 100644 --- a/etag/Cargo.toml +++ b/etag/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-etag" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -19,7 +19,7 @@ digest = "0.9.0" assert-impl = "0.1.3" futures-lite = { version = "1.12.0", optional = true } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-utils = { version = "0.2.1", path = "../utils" } [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } diff --git a/etag/README.md b/etag/README.md index 2b44ebc9..7472f1f0 100644 --- a/etag/README.md +++ b/etag/README.md @@ -18,14 +18,14 @@ ```toml [dependencies] -qiniu-etag = "0.2.0" +qiniu-etag = "0.2.1" ``` ### 启用异步接口 ```toml [dependencies] -qiniu-etag = { version = "0.2.0", features = ["async"] } +qiniu-etag = { version = "0.2.1", features = ["async"] } ``` ## 代码示例 diff --git a/http-client/Cargo.toml b/http-client/Cargo.toml index 5c3451d2..b587576c 100644 --- a/http-client/Cargo.toml +++ b/http-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-http-client" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -56,13 +56,13 @@ trust-dns-resolver = { version = "0.21.2", optional = true } async-std-resolver = { version = "0.21.2", optional = true } async-once-cell = { version = "0.3.0", optional = true } -qiniu-http = { version = "0.2.0", path = "../http" } -qiniu-credential = { version = "0.2.0", path = "../credential" } -qiniu-upload-token = { version = "0.2.0", path = "../upload-token" } -qiniu-reqwest = { version = "0.2.0", path = "../http-reqwest", optional = true } -qiniu-isahc = { version = "0.2.0", path = "../http-isahc", optional = true } -qiniu-ureq = { version = "0.2.0", path = "../http-ureq", optional = true } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-http = { version = "0.2.1", path = "../http" } +qiniu-credential = { version = "0.2.1", path = "../credential" } +qiniu-upload-token = { version = "0.2.1", path = "../upload-token" } +qiniu-reqwest = { version = "0.2.1", path = "../http-reqwest", optional = true } +qiniu-isahc = { version = "0.2.1", path = "../http-isahc", optional = true } +qiniu-ureq = { version = "0.2.1", path = "../http-ureq", optional = true } +qiniu-utils = { version = "0.2.1", path = "../utils" } [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } diff --git a/http-client/README.md b/http-client/README.md index d9dd89bb..2b89a32f 100644 --- a/http-client/README.md +++ b/http-client/README.md @@ -105,21 +105,21 @@ ```toml [dependencies] -qiniu-http-client = { version = "0.2.0", features = ["ureq"] } +qiniu-http-client = { version = "0.2.1", features = ["ureq"] } ``` ### 启用 Isahc 异步接口 ```toml [dependencies] -qiniu-http-client = { version = "0.2.0", features = ["async", "isahc"] } +qiniu-http-client = { version = "0.2.1", features = ["async", "isahc"] } ``` ### 启用 Reqwest 异步接口 ```toml [dependencies] -qiniu-http-client = { version = "0.2.0", features = ["async", "reqwest"] } +qiniu-http-client = { version = "0.2.1", features = ["async", "reqwest"] } ``` ### 其他功能 diff --git a/http-client/src/client/call/send_http_request.rs b/http-client/src/client/call/send_http_request.rs index e6f62291..58e89f57 100644 --- a/http-client/src/client/call/send_http_request.rs +++ b/http-client/src/client/call/send_http_request.rs @@ -86,7 +86,7 @@ fn need_retry_after_backoff(err: &TryError) -> bool { } fn handle_response_error( - response_error: ResponseError, + mut response_error: ResponseError, http_parts: &mut HttpRequestParts, parts: &InnerRequestParts<'_>, retried: &mut RetriedStatsInfo, @@ -98,6 +98,7 @@ fn handle_response_error( .build(), ); retried.increase_current_endpoint(); + response_error = response_error.set_retry_decision(retry_result.decision()); TryError::new(response_error, retry_result) } diff --git a/http-client/src/client/chooser/shuffled.rs b/http-client/src/client/chooser/shuffled.rs index 11178287..d440270d 100644 --- a/http-client/src/client/chooser/shuffled.rs +++ b/http-client/src/client/chooser/shuffled.rs @@ -101,7 +101,7 @@ mod tests { ); assert_eq!( make_set(ip_chooser.choose(IPS_WITHOUT_PORT, Default::default())), - make_set(&[IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)]), + make_set([IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)]), ); ip_chooser.feedback( @@ -126,7 +126,7 @@ mod tests { ); assert_eq!( make_set(ip_chooser.choose(IPS_WITHOUT_PORT, Default::default())), - make_set(&[ + make_set([ IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None), IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None), ]) diff --git a/http-client/src/client/chooser/subnet.rs b/http-client/src/client/chooser/subnet.rs index 052f669f..a57d1a3d 100644 --- a/http-client/src/client/chooser/subnet.rs +++ b/http-client/src/client/chooser/subnet.rs @@ -114,7 +114,7 @@ impl Chooser for SubnetChooser { } } } - let chosen_ips = choose_group(subnets_map.into_iter().map(|(_, ips)| ips)).unwrap_or_default(); + let chosen_ips = choose_group(subnets_map.into_values()).unwrap_or_default(); do_some_work_async(&self.inner, need_to_shrink); return chosen_ips.into(); diff --git a/http-client/src/client/request/multipart.rs b/http-client/src/client/request/multipart.rs index e8ccaff2..b8483b9c 100644 --- a/http-client/src/client/request/multipart.rs +++ b/http-client/src/client/request/multipart.rs @@ -282,8 +282,8 @@ mod sync_part { /// 设置阻塞 Multipart 的请求体为文件 pub fn file_path + ?Sized>(path: &S) -> IoResult { let path = Path::new(path); - let file = File::open(&path)?; - let mut metadata = PartMetadata::default().mime(mime_guess::from_path(&path).first_or_octet_stream()); + let file = File::open(path)?; + let mut metadata = PartMetadata::default().mime(mime_guess::from_path(path).first_or_octet_stream()); if let Some(file_name) = path.file_name() { let file_name = match file_name.to_string_lossy() { Cow::Borrowed(str) => FileName::from(str), @@ -420,7 +420,7 @@ mod async_part { pub async fn file_path + ?Sized>(path: &S) -> IoResult> { let path = Path::new(path); let file = File::open(&path).await?; - let mut metadata = PartMetadata::default().mime(mime_guess::from_path(&path).first_or_octet_stream()); + let mut metadata = PartMetadata::default().mime(mime_guess::from_path(path).first_or_octet_stream()); if let Some(file_name) = path.file_name() { let file_name = match file_name.to_string_lossy() { Cow::Borrowed(str) => FileName::from(str), diff --git a/http-client/src/client/response/error.rs b/http-client/src/client/response/error.rs index 6c706354..fbbed8ed 100644 --- a/http-client/src/client/response/error.rs +++ b/http-client/src/client/response/error.rs @@ -1,5 +1,5 @@ use super::{ - super::super::{EndpointParseError, RetriedStatsInfo}, + super::super::{EndpointParseError, RetriedStatsInfo, RetryDecision}, X_LOG_HEADER_NAME, X_REQ_ID_HEADER_NAME, }; use anyhow::Error as AnyError; @@ -62,6 +62,7 @@ pub struct Error { x_headers: XHeaders, response_body_sample: Vec, retried: Option, + retry_decision: Option, extensions: Extensions, } @@ -80,6 +81,7 @@ impl Error { x_headers: Default::default(), response_body_sample: Default::default(), retried: Default::default(), + retry_decision: Default::default(), extensions: Default::default(), } } @@ -96,6 +98,7 @@ impl Error { x_headers: Default::default(), response_body_sample: Default::default(), retried: Default::default(), + retry_decision: Default::default(), extensions: Default::default(), } } @@ -108,6 +111,14 @@ impl Error { self } + /// 设置重试决定 + #[inline] + #[must_use] + pub fn set_retry_decision(mut self, retry_decision: RetryDecision) -> Self { + self.retry_decision = Some(retry_decision); + self + } + /// 设置 HTTP 响应信息 #[inline] #[must_use] @@ -152,6 +163,12 @@ impl Error { self.kind } + /// 获取重试决定 + #[inline] + pub fn retry_decision(&self) -> Option { + self.retry_decision + } + /// 获取响应体样本 #[inline] pub fn response_body_sample(&self) -> &[u8] { @@ -214,6 +231,7 @@ impl Error { error: err.into_inner(), response_body_sample: Default::default(), retried: Default::default(), + retry_decision: Default::default(), extensions: Default::default(), } } diff --git a/http-client/src/client/retrier/error.rs b/http-client/src/client/retrier/error.rs index e3c83369..dc72b2fc 100644 --- a/http-client/src/client/retrier/error.rs +++ b/http-client/src/client/retrier/error.rs @@ -37,7 +37,7 @@ impl RequestRetrier for ErrorRetrier { ResponseErrorKind::UnexpectedStatusCode(_) => RetryDecision::DontRetry, ResponseErrorKind::StatusCodeError(status_code) => match status_code.as_u16() { 0..=399 => panic!("Should not arrive here"), - 400..=501 | 579 | 599 | 608 | 612 | 614 | 616 | 618 | 630 | 631 | 632 | 640 | 701 => { + 400..=499 | 501 | 579 | 608 | 612 | 614 | 616 | 618 | 630 | 631 | 632 | 640 | 701 => { RetryDecision::DontRetry } 509 | 573 => RetryDecision::Throttled, diff --git a/http-client/src/regions/endpoints_provider/endpoints.rs b/http-client/src/regions/endpoints_provider/endpoints.rs index 3aa8180b..babb8f11 100644 --- a/http-client/src/regions/endpoints_provider/endpoints.rs +++ b/http-client/src/regions/endpoints_provider/endpoints.rs @@ -66,6 +66,14 @@ impl Endpoints { &self.alternative } + /// 对比两组终端地址列表是否相似 + /// + /// 相似指的是,两个终端地址列表内主要终端地址列表中的域名相同,备选终端地址列表中的域名也相同,但顺序可能不同 + #[inline] + pub fn similar(&self, other: &Self) -> bool { + self.md5() == other.md5() + } + fn from_region(region: &Region, services: &[ServiceName]) -> Self { let mut builder = EndpointsBuilder { preferred: vec![], @@ -111,18 +119,25 @@ impl Endpoints { pub(in super::super) fn md5(&self) -> &Md5Value { self.md5.get_or_init(|| { - let mut all_endpoints: Vec<_> = self - .preferred() - .iter() - .chain(self.alternative().iter()) - .map(|endpoint| endpoint.to_string()) - .collect(); - all_endpoints.sort(); - - all_endpoints + let mut preferred_endpoints = self.preferred().iter().map(|e| e.to_string()).collect::>(); + let mut alternative_endpoints = self.alternative().iter().map(|e| e.to_string()).collect::>(); + + preferred_endpoints.sort(); + alternative_endpoints.sort(); + + let mut md5 = preferred_endpoints .into_iter() .fold(Md5::default(), |mut md5, endpoint| { md5.update(endpoint.as_bytes()); + md5.update(b"\0"); + md5 + }); + md5.update(b"\n"); + alternative_endpoints + .into_iter() + .fold(md5, |mut md5, endpoint| { + md5.update(endpoint.as_bytes()); + md5.update(b"\0"); md5 }) .finalize() diff --git a/http-isahc/Cargo.toml b/http-isahc/Cargo.toml index a0151558..cd1e622a 100644 --- a/http-isahc/Cargo.toml +++ b/http-isahc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-isahc" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -16,7 +16,7 @@ keywords = ["qiniu", "storage"] [dependencies] anyhow = "1.0.41" isahc = { version = "1.4.0" } -qiniu-http = { version = "0.2.0", path = "../http" } +qiniu-http = { version = "0.2.1", path = "../http" } futures = { version = "0.3.16", optional = true } [features] diff --git a/http-isahc/README.md b/http-isahc/README.md index 8d595636..d13c1bef 100644 --- a/http-isahc/README.md +++ b/http-isahc/README.md @@ -18,14 +18,14 @@ ```toml [dependencies] -qiniu-isahc = "0.2.0" +qiniu-isahc = "0.2.1" ``` ### 启用异步接口 ```toml [dependencies] -qiniu-isahc = { version = "0.2.0", features = ["async"] } +qiniu-isahc = { version = "0.2.1", features = ["async"] } ``` ## 最低支持的 Rust 版本(MSRV) diff --git a/http-reqwest/Cargo.toml b/http-reqwest/Cargo.toml index 6533110a..39281a76 100644 --- a/http-reqwest/Cargo.toml +++ b/http-reqwest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-reqwest" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -16,7 +16,7 @@ keywords = ["qiniu", "storage"] [dependencies] anyhow = "1.0.41" reqwest = { version = "0.11.4", features = ["blocking", "stream"] } -qiniu-http = { version = "0.2.0", path = "../http" } +qiniu-http = { version = "0.2.1", path = "../http" } bytes = { version = "1.0.1", optional = true } futures = { version = "0.3.16", optional = true } diff --git a/http-reqwest/README.md b/http-reqwest/README.md index 0c079503..ce72bdef 100644 --- a/http-reqwest/README.md +++ b/http-reqwest/README.md @@ -18,14 +18,14 @@ ```toml [dependencies] -qiniu-reqwest = "0.2.0" +qiniu-reqwest = "0.2.1" ``` ### 启用异步接口 ```toml [dependencies] -qiniu-reqwest = { version = "0.2.0", features = ["async"] } +qiniu-reqwest = { version = "0.2.1", features = ["async"] } ``` ## 最低支持的 Rust 版本(MSRV) diff --git a/http-ureq/Cargo.toml b/http-ureq/Cargo.toml index 1694e292..9081f235 100644 --- a/http-ureq/Cargo.toml +++ b/http-ureq/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-ureq" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -17,7 +17,7 @@ keywords = ["qiniu", "storage"] anyhow = "1.0.41" ureq = "2.5.0" -qiniu-http = { version = "0.2.0", path = "../http" } +qiniu-http = { version = "0.2.1", path = "../http" } [features] async = ["qiniu-http/async"] diff --git a/http-ureq/README.md b/http-ureq/README.md index 620b0b31..c18c74b2 100644 --- a/http-ureq/README.md +++ b/http-ureq/README.md @@ -14,7 +14,7 @@ ```toml [dependencies] -qiniu-ureq = "0.2.0" +qiniu-ureq = "0.2.1" ``` ## 最低支持的 Rust 版本(MSRV) diff --git a/http/Cargo.toml b/http/Cargo.toml index 1857b661..c1aa3f24 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-http" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -23,7 +23,7 @@ serde = "1.0.130" auto_impl = "1.0.0" futures-lite = { version = "1.11.2", optional = true } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-utils = { version = "0.2.1", path = "../utils" } [build-dependencies] rustc_version = "0.4.0" diff --git a/http/README.md b/http/README.md index 2bec20f2..e1701920 100644 --- a/http/README.md +++ b/http/README.md @@ -19,14 +19,14 @@ ```toml [dependencies] -qiniu-http = "0.2.0" +qiniu-http = "0.2.1" ``` ### 启用异步接口 ```toml [dependencies] -qiniu-http = { version = "0.2.0", features = ["async"] } +qiniu-http = { version = "0.2.1", features = ["async"] } ``` ## 最低支持的 Rust 版本(MSRV) diff --git a/http/src/lib.rs b/http/src/lib.rs index d4d52b5d..a44a7b2b 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -125,6 +125,7 @@ static LIBRARY_USER_AGENT: OnceCell = OnceCell::new(); /// 该方法只能调用一次,一旦调用,全局生效 /// /// 每个请求的 UserAgent 由七牛 SDK 固定 UserAgent + 库 UserAgent + 请求的追加 UserAgent 三部分组成 +#[allow(clippy::result_large_err)] pub fn set_library_user_agent(user_agent: UserAgent) -> Result<(), UserAgent> { LIBRARY_USER_AGENT.set(user_agent) } diff --git a/objects-manager/Cargo.toml b/objects-manager/Cargo.toml index a179df7b..5eec0bb9 100644 --- a/objects-manager/Cargo.toml +++ b/objects-manager/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-objects-manager" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -28,8 +28,8 @@ async-once-cell = { version = "0.3.0", optional = true } dyn-clonable = "0.9.0" auto_impl = "1.0.0" -qiniu-apis = { version = "0.2.0", path = "../apis", default-features = false } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-apis = { version = "0.2.1", path = "../apis", default-features = false } +qiniu-utils = { version = "0.2.1", path = "../utils" } [dev-dependencies] env_logger = "0.9.0" diff --git a/objects-manager/README.md b/objects-manager/README.md index e8580baa..d7508e87 100644 --- a/objects-manager/README.md +++ b/objects-manager/README.md @@ -17,21 +17,21 @@ ```toml [dependencies] -qiniu-objects-manager = { version = "0.2.0", features = ["ureq"] } +qiniu-objects-manager = { version = "0.2.1", features = ["ureq"] } ``` ### 启用 Isahc 异步接口 ```toml [dependencies] -qiniu-objects-manager = { version = "0.2.0", features = ["async", "isahc"] } +qiniu-objects-manager = { version = "0.2.1", features = ["async", "isahc"] } ``` ### 启用 Reqwest 异步接口 ```toml [dependencies] -qiniu-objects-manager = { version = "0.2.0", features = ["async", "reqwest"] } +qiniu-objects-manager = { version = "0.2.1", features = ["async", "reqwest"] } ``` ### 其他功能 diff --git a/objects-manager/src/list.rs b/objects-manager/src/list.rs index 38f05f71..5af6dcaa 100644 --- a/objects-manager/src/list.rs +++ b/objects-manager/src/list.rs @@ -110,7 +110,7 @@ impl<'a> ListParams<'a> { query_params = query_params.set_limit_as_usize(limit); } if let Some(prefix) = self.prefix.as_ref() { - query_params = query_params.set_prefix_as_str(prefix.to_owned()); + query_params = query_params.set_prefix_as_str(prefix.clone()); } if self.need_parts { query_params = query_params.set_need_parts_as_bool(true); diff --git a/objects-manager/src/operation.rs b/objects-manager/src/operation.rs index 460d7b28..f58dc9fb 100644 --- a/objects-manager/src/operation.rs +++ b/objects-manager/src/operation.rs @@ -601,7 +601,7 @@ impl ModifyObjectStatus<'_> { fn to_path_params(&self) -> qiniu_apis::storage::modify_object_status::PathParams { qiniu_apis::storage::modify_object_status::PathParams::default() .set_entry_as_str(self.entry.to_string()) - .set_status_as_usize(if self.disabled { 1 } else { 0 }) + .set_status_as_usize(usize::from(self.disabled)) } } diff --git a/sdk-examples/Cargo.toml b/sdk-examples/Cargo.toml index c1833f92..4586b639 100644 --- a/sdk-examples/Cargo.toml +++ b/sdk-examples/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-sdk-examples" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -15,7 +15,7 @@ anyhow = "1.0.41" async-std = { version = "1.6.3", features = ["attributes"] } env_logger = "0.9.0" log = "0.4.14" -qiniu-sdk = { version = "0.2.0", path = "../sdk", features = ["apis", "upload", "objects", "async", "isahc", "trust_dns"] } +qiniu-sdk = { version = "0.2.1", path = "../sdk", features = ["apis", "upload", "objects", "async", "isahc", "trust_dns"] } structopt = "0.3.23" [dependencies] diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 69f12813..59e47874 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-sdk" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -14,19 +14,19 @@ keywords = ["qiniu", "storage"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -qiniu-utils = { version = "0.2.0", path = "../utils", optional = true } -qiniu-etag = { version = "0.2.0", path = "../etag", optional = true } -qiniu-credential = { version = "0.2.0", path = "../credential", optional = true } -qiniu-upload-token = { version = "0.2.0", path = "../upload-token", optional = true } -qiniu-http = { version = "0.2.0", path = "../http", optional = true } -qiniu-ureq = { version = "0.2.0", path = "../http-ureq", optional = true } -qiniu-isahc = { version = "0.2.0", path = "../http-isahc", optional = true } -qiniu-reqwest = { version = "0.2.0", path = "../http-reqwest", optional = true } -qiniu-http-client = { version = "0.2.0", path = "../http-client", optional = true, default-features = false } -qiniu-apis = { version = "0.2.0", path = "../apis", optional = true, default-features = false } -qiniu-objects-manager = { version = "0.2.0", path = "../objects-manager", optional = true, default-features = false } -qiniu-upload-manager = { version = "0.2.0", path = "../upload-manager", optional = true, default-features = false } -qiniu-download-manager = { version = "0.2.0", path = "../download-manager", optional = true, default-features = false } +qiniu-utils = { version = "0.2.1", path = "../utils", optional = true } +qiniu-etag = { version = "0.2.1", path = "../etag", optional = true } +qiniu-credential = { version = "0.2.1", path = "../credential", optional = true } +qiniu-upload-token = { version = "0.2.1", path = "../upload-token", optional = true } +qiniu-http = { version = "0.2.1", path = "../http", optional = true } +qiniu-ureq = { version = "0.2.1", path = "../http-ureq", optional = true } +qiniu-isahc = { version = "0.2.1", path = "../http-isahc", optional = true } +qiniu-reqwest = { version = "0.2.1", path = "../http-reqwest", optional = true } +qiniu-http-client = { version = "0.2.1", path = "../http-client", optional = true, default-features = false } +qiniu-apis = { version = "0.2.1", path = "../apis", optional = true, default-features = false } +qiniu-objects-manager = { version = "0.2.1", path = "../objects-manager", optional = true, default-features = false } +qiniu-upload-manager = { version = "0.2.1", path = "../upload-manager", optional = true, default-features = false } +qiniu-download-manager = { version = "0.2.1", path = "../download-manager", optional = true, default-features = false } [features] default = ["ureq"] diff --git a/upload-manager/Cargo.toml b/upload-manager/Cargo.toml index d8fc8e6f..5e57430a 100644 --- a/upload-manager/Cargo.toml +++ b/upload-manager/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-upload-manager" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -36,9 +36,9 @@ futures = { version = "0.3.5", optional = true } async-once-cell = { version = "0.3.0", optional = true } async-std = { version = "1.10.0", optional = true } -qiniu-apis = { version = "0.2.0", path = "../apis", default-features = false } -qiniu-upload-token = { version = "0.2.0", path = "../upload-token" } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-apis = { version = "0.2.1", path = "../apis", default-features = false } +qiniu-upload-token = { version = "0.2.1", path = "../upload-token" } +qiniu-utils = { version = "0.2.1", path = "../utils" } [dev-dependencies] env_logger = "0.9.0" diff --git a/upload-manager/README.md b/upload-manager/README.md index 4e0db630..3965b596 100644 --- a/upload-manager/README.md +++ b/upload-manager/README.md @@ -16,21 +16,21 @@ ```toml [dependencies] -qiniu-upload-manager = { version = "0.2.0", features = ["ureq"] } +qiniu-upload-manager = { version = "0.2.1", features = ["ureq"] } ``` ### 启用 Isahc 异步接口 ```toml [dependencies] -qiniu-upload-manager = { version = "0.2.0", features = ["async", "isahc"] } +qiniu-upload-manager = { version = "0.2.1", features = ["async", "isahc"] } ``` ### 启用 Reqwest 异步接口 ```toml [dependencies] -qiniu-upload-manager = { version = "0.2.0", features = ["async", "reqwest"] } +qiniu-upload-manager = { version = "0.2.1", features = ["async", "reqwest"] } ``` ### 其他功能 diff --git a/upload-manager/src/multi_parts_uploader/mod.rs b/upload-manager/src/multi_parts_uploader/mod.rs index 9908c3a1..51e7d0f6 100644 --- a/upload-manager/src/multi_parts_uploader/mod.rs +++ b/upload-manager/src/multi_parts_uploader/mod.rs @@ -1,8 +1,17 @@ use super::{ - DataPartitionProvider, DataSource, MultiPartsUploaderWithCallbacks, ObjectParams, ResumableRecorder, UploadManager, + upload_token::OwnedUploadTokenProviderOrReferenced, DataPartitionProvider, DataSource, + MultiPartsUploaderWithCallbacks, ObjectParams, ResumableRecorder, UploadManager, }; use digest::Digest; -use qiniu_apis::http_client::{ApiResult, RegionsProvider}; +use qiniu_apis::{ + credential::AccessKey, + http_client::{ + ApiResult, BucketRegionsProvider, Endpoints, EndpointsGetOptions, EndpointsProvider, GotRegions, + RegionsProvider, RegionsProviderEndpoints, ServiceName, + }, + storage, +}; +use qiniu_upload_token::{BucketName, ObjectName}; use serde_json::Value; use smart_default::SmartDefault; use std::{fmt::Debug, num::NonZeroU64}; @@ -33,6 +42,9 @@ pub trait MultiPartsUploader: resumable_recorder: R, ) -> Self; + /// 获取初始化使用的上传管理器 + fn upload_manager(&self) -> &UploadManager; + /// 初始化分片信息 /// /// 该步骤只负责初始化分片,但不实际上传数据,如果提供了有效的断点续传记录器,则可以尝试在这一步找到记录。 @@ -75,6 +87,17 @@ pub trait MultiPartsUploader: options: ReinitializeOptions, ) -> ApiResult<()>; + /// 尝试恢复记录 + /// + /// 如果提供了有效的断点续传记录器,该方法可以尝试在找到记录,如果找不到记录,或记录无法读取,则返回 `None`。 + /// + /// 该方法的异步版本为 [`Self::try_to_async_resume_records`]。 + fn try_to_resume_parts + 'static>( + &self, + source: D, + params: ObjectParams, + ) -> Option; + #[cfg(feature = "async")] #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))] /// 初始化的异步分片信息 @@ -130,12 +153,26 @@ pub trait MultiPartsUploader: initialized: &'r mut Self::AsyncInitializedParts, options: ReinitializeOptions, ) -> BoxFuture<'r, ApiResult<()>>; + + /// 异步尝试恢复记录 + /// + /// 如果提供了有效的断点续传记录器,该方法可以尝试在找到记录,如果找不到记录,或记录无法读取,则返回 `None`。 + #[cfg(feature = "async")] + #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))] + fn try_to_async_resume_parts + 'static>( + &self, + source: D, + params: ObjectParams, + ) -> BoxFuture>; } /// 初始化的分片信息 pub trait InitializedParts: __private::Sealed + Clone + Send + Sync + Debug { /// 获取对象上传参数 fn params(&self) -> &ObjectParams; + + /// 上传地址列表 + fn up_endpoints(&self) -> &Endpoints; } /// 已经上传的分片信息 @@ -172,6 +209,46 @@ enum ReinitializedUpEndpointsProvider { SpecifiedRegionsProvider(Box), } +impl ReinitializeOptions { + fn get_up_endpoints( + &self, + uploader: &M, + initialized: &M::InitializedParts, + ) -> ApiResult { + match &self.endpoints_provider { + ReinitializedUpEndpointsProvider::KeepOriginalUpEndpoints => Ok(initialized.up_endpoints().to_owned()), + ReinitializedUpEndpointsProvider::RefreshUpEndpoints => uploader.get_up_endpoints(initialized.params()), + ReinitializedUpEndpointsProvider::SpecifiedRegionsProvider(regions_provider) => { + let opts = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); + Ok(RegionsProviderEndpoints::new(regions_provider) + .get_endpoints(opts)? + .into_owned()) + } + } + } + + #[cfg(feature = "async")] + async fn async_get_up_endpoints( + &self, + uploader: &M, + initialized: &M::AsyncInitializedParts, + ) -> ApiResult { + match &self.endpoints_provider { + ReinitializedUpEndpointsProvider::KeepOriginalUpEndpoints => Ok(initialized.up_endpoints().to_owned()), + ReinitializedUpEndpointsProvider::RefreshUpEndpoints => { + uploader.async_get_up_endpoints(initialized.params()).await + } + ReinitializedUpEndpointsProvider::SpecifiedRegionsProvider(regions_provider) => { + let opts = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); + Ok(RegionsProviderEndpoints::new(regions_provider) + .async_get_endpoints(opts) + .await? + .into_owned()) + } + } + } +} + /// 重新初始化分片信息的选项构建器 #[derive(Debug, Clone)] pub struct ReinitializeOptionsBuilder(ReinitializeOptions); @@ -216,3 +293,107 @@ mod progress; mod __private { pub trait Sealed {} } + +pub(super) trait MultiPartsUploaderExt: MultiPartsUploader { + fn storage(&self) -> storage::Client { + self.upload_manager().client().storage() + } + + fn access_key(&self) -> ApiResult { + self.upload_manager().upload_token().access_key() + } + + fn bucket_name(&self) -> ApiResult { + self.upload_manager().upload_token().bucket_name() + } + + #[cfg(feature = "async")] + fn async_access_key(&self) -> BoxFuture> { + Box::pin(async move { self.upload_manager().upload_token().async_access_key().await }) + } + + #[cfg(feature = "async")] + fn async_bucket_name(&self) -> BoxFuture> { + Box::pin(async move { self.upload_manager().upload_token().async_bucket_name().await }) + } + + fn get_bucket_regions(&self, params: &ObjectParams) -> ApiResult { + if let Some(region_provider) = params.region_provider() { + region_provider.get_all(Default::default()) + } else { + self.get_bucket_region()?.get_all(Default::default()) + } + } + + #[cfg(feature = "async")] + fn async_get_bucket_regions<'a>(&'a self, params: &'a ObjectParams) -> BoxFuture<'a, ApiResult> { + Box::pin(async move { + if let Some(region_provider) = params.region_provider() { + region_provider.async_get_all(Default::default()).await + } else { + self.async_get_bucket_region() + .await? + .async_get_all(Default::default()) + .await + } + }) + } + + fn get_up_endpoints(&self, params: &ObjectParams) -> ApiResult { + let options = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); + let up_endpoints = if let Some(region_provider) = params.region_provider() { + RegionsProviderEndpoints::new(region_provider) + .get_endpoints(options)? + .into_owned() + } else { + RegionsProviderEndpoints::new(self.get_bucket_region()?) + .get_endpoints(options)? + .into_owned() + }; + Ok(up_endpoints) + } + + #[cfg(feature = "async")] + fn async_get_up_endpoints<'a>(&'a self, params: &'a ObjectParams) -> BoxFuture<'a, ApiResult> { + Box::pin(async move { + let options = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); + let up_endpoints = if let Some(region_provider) = params.region_provider() { + RegionsProviderEndpoints::new(region_provider) + .async_get_endpoints(options) + .await? + .into_owned() + } else { + RegionsProviderEndpoints::new(self.async_get_bucket_region().await?) + .async_get_endpoints(options) + .await? + .into_owned() + }; + Ok(up_endpoints) + }) + } + + fn get_bucket_region(&self) -> ApiResult { + Ok(self + .upload_manager() + .queryer() + .query(self.access_key()?, self.bucket_name()?)) + } + + #[cfg(feature = "async")] + fn async_get_bucket_region(&self) -> BoxFuture> { + Box::pin(async move { + Ok(self + .upload_manager() + .queryer() + .query(self.async_access_key().await?, self.async_bucket_name().await?)) + }) + } + + fn make_upload_token_signer(&self, object_name: Option) -> OwnedUploadTokenProviderOrReferenced<'_> { + self.upload_manager() + .upload_token() + .make_upload_token_provider(object_name) + } +} + +impl MultiPartsUploaderExt for M {} diff --git a/upload-manager/src/multi_parts_uploader/v1.rs b/upload-manager/src/multi_parts_uploader/v1.rs index 7c5b3110..f775b0d2 100644 --- a/upload-manager/src/multi_parts_uploader/v1.rs +++ b/upload-manager/src/multi_parts_uploader/v1.rs @@ -2,26 +2,22 @@ use super::{ super::{ callbacks::{Callbacks, UploadingProgressInfo}, data_source::{Digestible, SourceKey}, - upload_token::OwnedUploadTokenProviderOrReferenced, AppendOnlyResumableRecorderMedium, DataPartitionProvider, DataPartitionProviderFeedback, DataSourceReader, - MultiplyDataPartitionProvider, UploaderWithCallbacks, + LimitedDataPartitionProvider, UploaderWithCallbacks, }, progress::{Progresses, ProgressesKey}, - DataSource, InitializedParts, MultiPartsUploader, MultiPartsUploaderWithCallbacks, ObjectParams, PartsExpiredError, - ReinitializeOptions, ReinitializedUpEndpointsProvider, ResumableRecorder, UploadManager, UploadedPart, + DataSource, InitializedParts, MultiPartsUploader, MultiPartsUploaderExt, MultiPartsUploaderWithCallbacks, + ObjectParams, PartsExpiredError, ReinitializeOptions, ResumableRecorder, UploadManager, UploadedPart, }; use anyhow::{Error as AnyError, Result as AnyResult}; use dashmap::DashMap; use digest::Digest; use qiniu_apis::{ - credential::AccessKey, http::{Reset, ResponseErrorKind as HttpResponseErrorKind, ResponseParts}, http_client::{ - ApiResult, BucketRegionsProvider, Endpoints, EndpointsGetOptions, EndpointsProvider, RegionsProviderEndpoints, - RequestBuilderParts, Response, ResponseError, ResponseErrorKind, ServiceName, + ApiResult, Endpoints, EndpointsProvider, RequestBuilderParts, Response, ResponseError, ResponseErrorKind, }, storage::{ - self, resumable_upload_v1_make_block::{ PathParams as MkBlkPathParams, ResponseBody as MkBlkResponseBody, SyncRequestBuilder as SyncMkBlkRequestBuilder, @@ -31,7 +27,7 @@ use qiniu_apis::{ }, }, }; -use qiniu_upload_token::{BucketName, ObjectName}; +use qiniu_upload_token::BucketName; use qiniu_utils::base64::urlsafe as urlsafe_base64; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -39,7 +35,6 @@ use sha1::Sha1; use std::{ fmt::{self, Debug}, io::{BufRead, BufReader, Cursor, Read, Result as IoResult, Write}, - mem::take, num::NonZeroU64, sync::{Arc, Mutex}, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, @@ -82,7 +77,7 @@ pub struct MultiPartsV1UploaderInitializedObject { source: S, params: ObjectParams, progresses: Progresses, - recovered_records: MultiPartsV1ResumableRecorderRecords, + resumed_records: MultiPartsV1ResumableRecorderRecords, } impl InitializedParts for MultiPartsV1UploaderInitializedObject { @@ -90,16 +85,15 @@ impl InitializedParts for MultiPartsV1UploaderIn fn params(&self) -> &ObjectParams { &self.params } -} - -impl super::__private::Sealed for MultiPartsV1UploaderInitializedObject {} -impl MultiPartsV1UploaderInitializedObject { + #[inline] fn up_endpoints(&self) -> &Endpoints { - &self.recovered_records.up_endpoints + &self.resumed_records.up_endpoints } } +impl super::__private::Sealed for MultiPartsV1UploaderInitializedObject {} + impl Debug for MultiPartsV1UploaderInitializedObject { #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -107,7 +101,7 @@ impl Debug for MultiPartsV1UploaderInitializedObject { .field("source", &self.source) .field("params", &self.params) .field("progresses", &self.progresses) - .field("recovered_records", &self.recovered_records) + .field("resumed_records", &self.resumed_records) .finish() } } @@ -229,40 +223,52 @@ impl MultiPartsUploader for MultiPartsV1Uploader } } + #[inline] + fn upload_manager(&self) -> &UploadManager { + &self.upload_manager + } + fn initialize_parts + 'static>( &self, source: D, params: ObjectParams, ) -> ApiResult { - let up_endpoints = self.up_endpoints(¶ms)?; - let recovered_records = self.try_to_recover(&source, up_endpoints)?; + let up_endpoints = self.get_up_endpoints(¶ms)?; + let resumed_records = self.resume_or_create_records(&source, up_endpoints)?; Ok(Self::InitializedParts { source: Box::new(source), params, - recovered_records, + resumed_records, progresses: Default::default(), }) } + fn try_to_resume_parts + 'static>( + &self, + source: D, + params: ObjectParams, + ) -> Option { + source + .source_key() + .ok() + .flatten() + .and_then(|source_key| self.try_to_resume_records(&source_key).ok().flatten()) + .map(|resumed_records| Self::InitializedParts { + source: Box::new(source), + params, + resumed_records, + progresses: Default::default(), + }) + } + fn reinitialize_parts( &self, initialized: &mut Self::InitializedParts, options: ReinitializeOptions, ) -> ApiResult<()> { initialized.source.reset()?; - let up_endpoints = match options.endpoints_provider { - ReinitializedUpEndpointsProvider::KeepOriginalUpEndpoints => { - take(&mut initialized.recovered_records.up_endpoints) - } - ReinitializedUpEndpointsProvider::RefreshUpEndpoints => self.up_endpoints(&initialized.params)?, - ReinitializedUpEndpointsProvider::SpecifiedRegionsProvider(regions_provider) => { - let opts = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); - RegionsProviderEndpoints::new(regions_provider) - .get_endpoints(opts)? - .into_owned() - } - }; - initialized.recovered_records = self.create_new_records(&initialized.source, up_endpoints)?; + let up_endpoints = options.get_up_endpoints(self, initialized)?; + initialized.resumed_records = self.create_new_records(&initialized.source, up_endpoints)?; initialized.progresses = Default::default(); Ok(()) } @@ -272,13 +278,12 @@ impl MultiPartsUploader for MultiPartsV1Uploader initialized: &Self::InitializedParts, data_partitioner_provider: &dyn DataPartitionProvider, ) -> ApiResult> { - let data_partitioner_provider = - MultiplyDataPartitionProvider::new_with_non_zero_multiply(data_partitioner_provider, PART_SIZE); + let data_partitioner_provider = normalize_data_partitioner_provider(data_partitioner_provider); let total_size = initialized.source.total_size()?; return if let Some(mut reader) = initialized.source.slice(data_partitioner_provider.part_size())? { if let Some(part_size) = NonZeroU64::new(reader.len()?) { let progresses_key = initialized.progresses.add_new_part(part_size.into()); - if let Some(uploaded_part) = _could_recover(initialized, &mut reader, part_size) { + if let Some(uploaded_part) = _could_resume(initialized, &mut reader, part_size) { self.after_part_uploaded(&progresses_key, total_size, Some(&uploaded_part))?; Ok(Some(uploaded_part)) } else { @@ -313,13 +318,13 @@ impl MultiPartsUploader for MultiPartsV1Uploader Ok(None) }; - fn _could_recover( + fn _could_resume( initialized: &MultiPartsV1UploaderInitializedObject>>, data_reader: &mut DataSourceReader, part_size: NonZeroU64, ) -> Option { let offset = data_reader.offset(); - initialized.recovered_records.take(offset).and_then(|record| { + initialized.resumed_records.take(offset).and_then(|record| { if record.size == part_size && record.expired_at() > SystemTime::now() && Some(record.base64ed_sha1.as_str()) == sha1_of_sync_reader(data_reader).ok().as_deref() @@ -337,7 +342,7 @@ impl MultiPartsUploader for MultiPartsV1Uploader } #[allow(clippy::too_many_arguments)] - fn _upload_part<'a, H: Digest, E: EndpointsProvider + Clone + 'a>( + fn _upload_part<'a, H: Digest + Send + 'static, E: EndpointsProvider + Clone + 'a>( uploader: &'a MultiPartsV1Uploader, mut request: SyncMkBlkRequestBuilder<'a, E>, mut body: DataSourceReader, @@ -379,7 +384,7 @@ impl MultiPartsUploader for MultiPartsV1Uploader response_body, }; initialized - .recovered_records + .resumed_records .persist(&record, &uploader.bucket_name()?, initialized.up_endpoints()) .ok(); Ok(MultiPartsV1UploaderUploadedPart::from_record(record, false)) @@ -399,7 +404,7 @@ impl MultiPartsUploader for MultiPartsV1Uploader body, ); - fn _complete_parts<'a, H: Digest, E: EndpointsProvider + Clone + 'a, D: DataSource>( + fn _complete_parts<'a, H: Digest + Send + 'static, E: EndpointsProvider + Clone + 'a, D: DataSource>( uploader: &'a MultiPartsV1Uploader, mut request: SyncMkFileRequestBuilder<'a, E>, source: &D, @@ -434,17 +439,42 @@ impl MultiPartsUploader for MultiPartsV1Uploader params: ObjectParams, ) -> BoxFuture> { Box::pin(async move { - let up_endpoints = self.async_up_endpoints(¶ms).await?; - let recovered_records = self.try_to_async_recover(&source, up_endpoints).await?; + let up_endpoints = self.async_get_up_endpoints(¶ms).await?; + let resumed_records = self.async_resume_or_create_records(&source, up_endpoints).await?; Ok(Self::AsyncInitializedParts { source: Box::new(source), params, - recovered_records, + resumed_records, progresses: Default::default(), }) }) } + #[cfg(feature = "async")] + #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))] + fn try_to_async_resume_parts + 'static>( + &self, + source: D, + params: ObjectParams, + ) -> BoxFuture> { + Box::pin(async move { + if let Some(source_key) = source.source_key().await.ok().flatten() { + self.try_to_async_resume_records(&source_key) + .await + .ok() + .flatten() + .map(|resumed_records| Self::AsyncInitializedParts { + source: Box::new(source), + params, + resumed_records, + progresses: Default::default(), + }) + } else { + None + } + }) + } + #[cfg(feature = "async")] #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))] fn async_reinitialize_parts<'r>( @@ -454,22 +484,8 @@ impl MultiPartsUploader for MultiPartsV1Uploader ) -> BoxFuture<'r, ApiResult<()>> { Box::pin(async move { initialized.source.reset().await?; - let up_endpoints = match options.endpoints_provider { - ReinitializedUpEndpointsProvider::KeepOriginalUpEndpoints => { - take(&mut initialized.recovered_records.up_endpoints) - } - ReinitializedUpEndpointsProvider::RefreshUpEndpoints => { - self.async_up_endpoints(&initialized.params).await? - } - ReinitializedUpEndpointsProvider::SpecifiedRegionsProvider(regions_provider) => { - let opts = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); - RegionsProviderEndpoints::new(regions_provider) - .async_get_endpoints(opts) - .await? - .into_owned() - } - }; - initialized.recovered_records = self.async_create_new_records(&initialized.source, up_endpoints).await?; + let up_endpoints = options.async_get_up_endpoints(self, initialized).await?; + initialized.resumed_records = self.async_create_new_records(&initialized.source, up_endpoints).await?; initialized.progresses = Default::default(); Ok(()) }) @@ -483,13 +499,12 @@ impl MultiPartsUploader for MultiPartsV1Uploader data_partitioner_provider: &'r dyn DataPartitionProvider, ) -> BoxFuture<'r, ApiResult>> { return Box::pin(async move { - let data_partitioner_provider = - MultiplyDataPartitionProvider::new_with_non_zero_multiply(data_partitioner_provider, PART_SIZE); + let data_partitioner_provider = normalize_data_partitioner_provider(data_partitioner_provider); let total_size = initialized.source.total_size().await?; if let Some(mut reader) = initialized.source.slice(data_partitioner_provider.part_size()).await? { if let Some(part_size) = NonZeroU64::new(reader.len().await?) { let progresses_key = initialized.progresses.add_new_part(part_size.into()); - if let Some(uploaded_part) = _could_recover(initialized, &mut reader, part_size).await { + if let Some(uploaded_part) = _could_resume(initialized, &mut reader, part_size).await { self.after_part_uploaded(&progresses_key, total_size, Some(&uploaded_part))?; Ok(Some(uploaded_part)) } else { @@ -526,13 +541,13 @@ impl MultiPartsUploader for MultiPartsV1Uploader } }); - async fn _could_recover( + async fn _could_resume( initialized: &MultiPartsV1UploaderInitializedObject>>, data_reader: &mut AsyncDataSourceReader, part_size: NonZeroU64, ) -> Option { let offset = data_reader.offset(); - OptionFuture::from(initialized.recovered_records.take(offset).map(|record| async move { + OptionFuture::from(initialized.resumed_records.take(offset).map(|record| async move { if record.size == part_size && record.expired_at() > SystemTime::now() && Some(record.base64ed_sha1.as_str()) == sha1_of_async_reader(data_reader).await.ok().as_deref() @@ -552,7 +567,7 @@ impl MultiPartsUploader for MultiPartsV1Uploader } #[allow(clippy::too_many_arguments)] - async fn _upload_part<'a, H: Digest, E: EndpointsProvider + Clone + 'a>( + async fn _upload_part<'a, H: Digest + Send + 'static, E: EndpointsProvider + Clone + 'a>( uploader: &'a MultiPartsV1Uploader, mut request: AsyncMkBlkRequestBuilder<'a, E>, mut body: AsyncDataSourceReader, @@ -594,7 +609,7 @@ impl MultiPartsUploader for MultiPartsV1Uploader response_body, }; initialized - .recovered_records + .resumed_records .async_persist( &record, &uploader.async_bucket_name().await?, @@ -628,7 +643,12 @@ impl MultiPartsUploader for MultiPartsV1Uploader .await }); - async fn _complete_parts<'a, H: Digest, E: EndpointsProvider + Clone + 'a, D: AsyncDataSource>( + async fn _complete_parts< + 'a, + H: Digest + Send + 'static, + E: EndpointsProvider + Clone + 'a, + D: AsyncDataSource, + >( uploader: &'a MultiPartsV1Uploader, mut request: AsyncMkFileRequestBuilder<'a, E>, source: &D, @@ -712,29 +732,11 @@ fn may_set_extensions_in_err(err: &mut ResponseError) { } } -impl MultiPartsV1Uploader { - fn storage(&self) -> storage::Client { - self.upload_manager.client().storage() - } - - fn access_key(&self) -> ApiResult { - self.upload_manager.upload_token().access_key() - } - - fn bucket_name(&self) -> ApiResult { - self.upload_manager.upload_token().bucket_name() - } - - #[cfg(feature = "async")] - async fn async_access_key(&self) -> ApiResult { - self.upload_manager.upload_token().async_access_key().await - } - - #[cfg(feature = "async")] - async fn async_bucket_name(&self) -> ApiResult { - self.upload_manager.upload_token().async_bucket_name().await - } +fn normalize_data_partitioner_provider(base: P) -> LimitedDataPartitionProvider

{ + LimitedDataPartitionProvider::new_with_non_zero_threshold(base, PART_SIZE, PART_SIZE) +} +impl MultiPartsV1Uploader { fn before_request_call(&self, request: &mut RequestBuilderParts<'_>) -> ApiResult<()> { self.callbacks.before_request(request).map_err(make_callback_error) } @@ -764,103 +766,49 @@ impl MultiPartsV1Uploader { )) .map_err(make_callback_error) } -} - -impl MultiPartsV1Uploader { - fn up_endpoints(&self, params: &ObjectParams) -> ApiResult { - let options = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); - let up_endpoints = if let Some(region_provider) = params.region_provider() { - RegionsProviderEndpoints::new(region_provider) - .get_endpoints(options)? - .into_owned() - } else { - RegionsProviderEndpoints::new(self.get_bucket_region()?) - .get_endpoints(options)? - .into_owned() - }; - Ok(up_endpoints) - } - - #[cfg(feature = "async")] - async fn async_up_endpoints(&self, params: &ObjectParams) -> ApiResult { - let options = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); - let up_endpoints = if let Some(region_provider) = params.region_provider() { - RegionsProviderEndpoints::new(region_provider) - .async_get_endpoints(options) - .await? - .into_owned() - } else { - RegionsProviderEndpoints::new(self.async_get_bucket_region().await?) - .async_get_endpoints(options) - .await? - .into_owned() - }; - Ok(up_endpoints) - } - - fn get_bucket_region(&self) -> ApiResult { - Ok(self - .upload_manager - .queryer() - .query(self.access_key()?, self.bucket_name()?)) - } - - #[cfg(feature = "async")] - async fn async_get_bucket_region(&self) -> ApiResult { - Ok(self - .upload_manager - .queryer() - .query(self.async_access_key().await?, self.async_bucket_name().await?)) - } - fn make_upload_token_signer(&self, object_name: Option) -> OwnedUploadTokenProviderOrReferenced<'_> { - self.upload_manager - .upload_token() - .make_upload_token_provider(object_name) - } - - fn try_to_recover>( + fn resume_or_create_records>( &self, source: &D, up_endpoints: Endpoints, ) -> IoResult { let records = if let Some(source_key) = source.source_key()? { - _try_to_recover(self, &source_key) + self.try_to_resume_records(&source_key) .ok() .flatten() .unwrap_or_else(|| _create_new_records(&self.resumable_recorder, &source_key, up_endpoints)) } else { MultiPartsV1ResumableRecorderRecords::new(up_endpoints) }; - return Ok(records); - - fn _try_to_recover( - uploader: &MultiPartsV1Uploader, - source_key: &SourceKey, - ) -> ApiResult> { - let mut medium = uploader.resumable_recorder.open_for_read(source_key)?; - let mut lines = BufReader::new(&mut medium).lines(); - let header = if let Some(line) = lines.next() { - let line = line?; - let header: MultiPartsV1ResumableRecorderDeserializableHeader = serde_json::from_str(&line)?; - if !header.is_v1() || header.bucket() != uploader.bucket_name()?.as_str() { - return Ok(None); - } - header - } else { + Ok(records) + } + + fn try_to_resume_records( + &self, + source_key: &SourceKey, + ) -> ApiResult> { + let mut medium = self.resumable_recorder.open_for_read(source_key)?; + let mut lines = BufReader::new(&mut medium).lines(); + let header = if let Some(line) = lines.next() { + let line = line?; + let header: MultiPartsV1ResumableRecorderDeserializableHeader = serde_json::from_str(&line)?; + if !header.is_v1() || header.bucket() != self.bucket_name()?.as_str() { return Ok(None); - }; - let mut records = lines - .map(|line| { - let line = line?; - let record: MultiPartsV1ResumableRecorderRecord = serde_json::from_str(&line)?; - Ok(record) - }) - .collect::>()?; - records.up_endpoints = header.up_endpoints; - records.set_medium_for_append(uploader.resumable_recorder.open_for_append(source_key)?, true); - Ok(Some(records)) - } + } + header + } else { + return Ok(None); + }; + let mut records = lines + .map(|line| { + let line = line?; + let record: MultiPartsV1ResumableRecorderRecord = serde_json::from_str(&line)?; + Ok(record) + }) + .collect::>()?; + records.up_endpoints = header.up_endpoints; + records.set_medium_for_append(self.resumable_recorder.open_for_append(source_key)?, true); + Ok(Some(records)) } fn create_new_records>( @@ -877,13 +825,13 @@ impl MultiPartsV1Uploader { } #[cfg(feature = "async")] - async fn try_to_async_recover>( + async fn async_resume_or_create_records>( &self, source: &D, up_endpoints: Endpoints, ) -> IoResult { let records = if let Some(source_key) = source.source_key().await? { - if let Some(records) = _try_to_recover(self, &source_key).await.ok().flatten() { + if let Some(records) = self.try_to_async_resume_records(&source_key).await.ok().flatten() { records } else { _async_create_new_records(&self.resumable_recorder, &source_key, up_endpoints).await @@ -891,38 +839,36 @@ impl MultiPartsV1Uploader { } else { MultiPartsV1ResumableRecorderRecords::new(up_endpoints) }; - return Ok(records); - - async fn _try_to_recover( - uploader: &MultiPartsV1Uploader, - source_key: &SourceKey, - ) -> ApiResult> { - let mut medium = uploader.resumable_recorder.open_for_async_read(source_key).await?; - let mut lines = AsyncBufReader::new(&mut medium).lines(); - let header = if let Some(line) = lines.try_next().await? { - let header: MultiPartsV1ResumableRecorderDeserializableHeader = serde_json::from_str(&line)?; - if !header.is_v1() || header.bucket() != uploader.async_bucket_name().await?.as_str() { - return Ok(None); - } - header - } else { + Ok(records) + } + + #[cfg(feature = "async")] + async fn try_to_async_resume_records( + &self, + source_key: &SourceKey, + ) -> ApiResult> { + let mut medium = self.resumable_recorder.open_for_async_read(source_key).await?; + let mut lines = AsyncBufReader::new(&mut medium).lines(); + let header = if let Some(line) = lines.try_next().await? { + let header: MultiPartsV1ResumableRecorderDeserializableHeader = serde_json::from_str(&line)?; + if !header.is_v1() || header.bucket() != self.async_bucket_name().await?.as_str() { return Ok(None); - }; - let mut records = lines - .map(|line| { - let line = line?; - let record: MultiPartsV1ResumableRecorderRecord = serde_json::from_str(&line)?; - Ok::<_, ResponseError>(record) - }) - .try_collect::() - .await?; - records.up_endpoints = header.up_endpoints; - records.set_medium_for_async_append( - uploader.resumable_recorder.open_for_async_append(source_key).await?, - true, - ); - Ok(Some(records)) - } + } + header + } else { + return Ok(None); + }; + let mut records = lines + .map(|line| { + let line = line?; + let record: MultiPartsV1ResumableRecorderRecord = serde_json::from_str(&line)?; + Ok::<_, ResponseError>(record) + }) + .try_collect::() + .await?; + records.up_endpoints = header.up_endpoints; + records.set_medium_for_async_append(self.resumable_recorder.open_for_async_append(source_key).await?, true); + Ok(Some(records)) } #[cfg(feature = "async")] @@ -1517,6 +1463,9 @@ mod tests { let params = ObjectParams::builder() .region_provider(single_up_domain_region()) .build(); + assert!(uploader + .try_to_resume_parts(file_source.to_owned(), params.to_owned()) + .is_none()); let initialized_parts = uploader.initialize_parts(file_source, params)?; uploader .upload_part(&initialized_parts, &new_data_partitioner_provider(BLOCK_SIZE))? @@ -1644,6 +1593,8 @@ mod tests { #[cfg(feature = "async")] #[async_std::test] async fn test_async_multi_parts_v1_upload_with_recovery() -> Result<()> { + use async_std::fs::read_dir as async_read_dir; + env_logger::builder().is_test(true).try_init().ok(); #[derive(Debug)] @@ -1711,6 +1662,10 @@ mod tests { let params = ObjectParams::builder() .region_provider(single_up_domain_region()) .build(); + assert!(uploader + .try_to_async_resume_parts(file_source.to_owned(), params.to_owned()) + .await + .is_none()); let initialized_parts = uploader.async_initialize_parts(file_source, params).await?; uploader .async_upload_part(&initialized_parts, &new_data_partitioner_provider(BLOCK_SIZE)) @@ -1726,7 +1681,7 @@ mod tests { let params = ObjectParams::builder() .region_provider(single_up_domain_region()) .build(); - let initialized_parts = uploader.async_initialize_parts(file_source, params).await?; + let initialized_parts = uploader.try_to_async_resume_parts(file_source, params).await.unwrap(); for _ in 0..2 { uploader .async_upload_part(&initialized_parts, &new_data_partitioner_provider(BLOCK_SIZE)) @@ -1805,7 +1760,7 @@ mod tests { } } - { + let mut initialized_parts = { let uploader = Arc::new(MultiPartsV1Uploader::new( get_upload_manager(FakeHttpCaller2::new(2)), FileSystemResumableRecorder::::new(resuming_files_dir.path()), @@ -1828,13 +1783,100 @@ mod tests { let parts = parts.into_iter().map(|part| part.unwrap()).collect::>(); let body = uploader.async_complete_parts(&initialized_parts, &parts).await?; assert_eq!(body.get("done").unwrap().as_u64().unwrap(), 1u64); + Arc::try_unwrap(initialized_parts).unwrap() + }; + + assert!(async_read_dir(resuming_files_dir.path()).await?.next().await.is_none()); + + #[derive(Debug)] + struct FakeHttpCaller3 { + mkblk_counts: AtomicUsize, + mkfile_counts: AtomicUsize, + } + + impl FakeHttpCaller3 { + fn new() -> Self { + Self { + mkblk_counts: Default::default(), + mkfile_counts: Default::default(), + } + } + } + + impl HttpCaller for FakeHttpCaller3 { + fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult { + unreachable!() + } + + fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> { + Box::pin(async move { + let resp_body = if request.url().path().starts_with("/mkblk/") { + let blk_size: u64; + scan_text!(request.url().path().bytes() => "/mkblk/{}", blk_size); + + match blk_size { + BLOCK_SIZE | LAST_BLOCK_SIZE => { + self.mkblk_counts.fetch_add(1, Ordering::Relaxed); + } + _ => unreachable!(), + } + let body_len = size_of_async_reader(request.body_mut()).await.unwrap(); + assert_eq!(body_len, blk_size); + json_to_vec(&json!({ + "ctx": format!("==={}===", self.mkblk_counts.load(Ordering::Relaxed)), + "checksum": sha1_of_async_reader(request.body_mut()).await.unwrap(), + "offset": blk_size, + "host": "http://fakeexample.com", + "expired_at": (SystemTime::now() + Duration::from_secs(3600)).duration_since(UNIX_EPOCH).unwrap().as_secs(), + })) + .unwrap() + } else if request.url().path().starts_with("/mkfile/") { + assert_eq!(self.mkblk_counts.load(Ordering::Relaxed), 3); + assert_eq!(self.mkfile_counts.fetch_add(1, Ordering::Relaxed), 0); + assert_eq!(request.url().path(), &format!("/mkfile/{}", FILE_SIZE)); + let mut req_body = Vec::new(); + async_io_copy(request.body_mut(), &mut req_body).await.unwrap(); + assert_eq!(String::from_utf8(req_body).unwrap().split(',').count(), BLOCK_COUNT); + json_to_vec(&json!({ + "done": 1, + })) + .unwrap() + } else { + unreachable!() + }; + Ok(AsyncResponse::builder() + .status_code(StatusCode::OK) + .header("x-reqid", HeaderValue::from_static("FakeReqid")) + .body(AsyncResponseBody::from_bytes(resp_body)) + .build()) + }) + } + } + + { + let uploader = Arc::new(MultiPartsV1Uploader::new( + get_upload_manager(FakeHttpCaller3::new()), + FileSystemResumableRecorder::::new(resuming_files_dir.path()), + )); + uploader + .async_reinitialize_parts(&mut initialized_parts, Default::default()) + .await?; + let initialized_parts = Arc::new(initialized_parts); + let tasks = (0..BLOCK_COUNT).map(|_| { + let uploader = uploader.to_owned(); + let initialized_parts = initialized_parts.to_owned(); + spawn_task(async move { + uploader + .async_upload_part(&initialized_parts, &new_data_partitioner_provider(BLOCK_SIZE)) + .await + }) + }); + let parts = join_all(tasks).await.into_iter().collect::>>()?; + let parts = parts.into_iter().map(|part| part.unwrap()).collect::>(); + let body = uploader.async_complete_parts(&initialized_parts, &parts).await?; + assert_eq!(body.get("done").unwrap().as_u64().unwrap(), 1u64); } - assert!(async_std::fs::read_dir(resuming_files_dir.path()) - .await? - .next() - .await - .is_none()); Ok(()) } } diff --git a/upload-manager/src/multi_parts_uploader/v2.rs b/upload-manager/src/multi_parts_uploader/v2.rs index 888bbdf8..d9fd3771 100644 --- a/upload-manager/src/multi_parts_uploader/v2.rs +++ b/upload-manager/src/multi_parts_uploader/v2.rs @@ -2,28 +2,24 @@ use super::{ super::{ callbacks::{Callbacks, UploadingProgressInfo}, data_source::{Digestible, SourceKey}, - upload_token::OwnedUploadTokenProviderOrReferenced, AppendOnlyResumableRecorderMedium, DataPartitionProvider, DataPartitionProviderFeedback, DataSourceReader, LimitedDataPartitionProvider, UploaderWithCallbacks, }, progress::{Progresses, ProgressesKey}, v1::make_callback_error, - DataSource, InitializedParts, MultiPartsUploader, MultiPartsUploaderWithCallbacks, ObjectParams, PartsExpiredError, - ReinitializeOptions, ReinitializedUpEndpointsProvider, ResumableRecorder, UploadManager, UploadedPart, + DataSource, InitializedParts, MultiPartsUploader, MultiPartsUploaderExt, MultiPartsUploaderWithCallbacks, + ObjectParams, PartsExpiredError, ReinitializeOptions, ResumableRecorder, UploadManager, UploadedPart, }; use anyhow::Result as AnyResult; use dashmap::DashMap; use digest::Digest; use qiniu_apis::{ base_types::StringMap, - credential::AccessKey, http::{Reset, ResponseParts}, http_client::{ - ApiResult, BucketRegionsProvider, Endpoints, EndpointsGetOptions, EndpointsProvider, RegionsProviderEndpoints, - RequestBuilderParts, Response, ResponseError, ResponseErrorKind, ServiceName, + ApiResult, Endpoints, EndpointsProvider, RequestBuilderParts, Response, ResponseError, ResponseErrorKind, }, storage::{ - self, resumable_upload_v2_complete_multipart_upload::{ PartInfo, PathParams as CompletePartsPathParams, RequestBody as CompletePartsRequestBody, SyncRequestBuilder as SyncCompletePartsRequestBuilder, @@ -90,7 +86,7 @@ pub struct MultiPartsV2UploaderInitializedObject { source: S, params: ObjectParams, progresses: Progresses, - recovered_records: MultiPartsV2ResumableRecorderRecords, + resumed_records: MultiPartsV2ResumableRecorderRecords, info: MultiPartsV2UploaderInitializedResponseInfo, } @@ -106,10 +102,6 @@ impl MultiPartsV2UploaderInitializedObject { pub fn expired_at(&self) -> SystemTime { self.info.expired_at } - - fn up_endpoints(&self) -> &Endpoints { - &self.recovered_records.up_endpoints - } } #[derive(Debug, Clone)] @@ -123,6 +115,11 @@ impl InitializedParts for MultiPartsV2UploaderIn fn params(&self) -> &ObjectParams { &self.params } + + #[inline] + fn up_endpoints(&self) -> &Endpoints { + &self.resumed_records.up_endpoints + } } impl super::__private::Sealed for MultiPartsV2UploaderInitializedObject {} @@ -135,7 +132,7 @@ impl Debug for MultiPartsV2UploaderInitializedObject { .field("source", &self.source) .field("params", &self.params) .field("progresses", &self.progresses) - .field("recovered_records", &self.recovered_records) + .field("resumed_records", &self.resumed_records) .finish() } } @@ -264,14 +261,20 @@ impl MultiPartsUploader for MultiPartsV2Uploader } } + #[inline] + fn upload_manager(&self) -> &UploadManager { + &self.upload_manager + } + fn initialize_parts + 'static>( &self, source: D, params: ObjectParams, ) -> ApiResult { - let (info, recovered_records) = self.try_to_recover(&source, ¶ms, self.up_endpoints(¶ms)?)?; + let (info, resumed_records) = + self.resume_or_create_records(&source, ¶ms, self.get_up_endpoints(¶ms)?)?; let info = info.map_or_else( - || self.call_initialize_parts(¶ms, &recovered_records.up_endpoints), + || self.call_initialize_parts(¶ms, &resumed_records.up_endpoints), Ok, )?; @@ -279,30 +282,38 @@ impl MultiPartsUploader for MultiPartsV2Uploader source: Box::new(source), params, info, - recovered_records, + resumed_records, progresses: Default::default(), }) } + fn try_to_resume_parts + 'static>( + &self, + source: D, + params: ObjectParams, + ) -> Option { + source + .source_key() + .ok() + .flatten() + .and_then(|source_key| self.try_to_resume_records(&source_key, ¶ms).ok().flatten()) + .map(|(info, resumed_records)| Self::InitializedParts { + source: Box::new(source), + params, + info, + resumed_records, + progresses: Default::default(), + }) + } + fn reinitialize_parts( &self, initialized: &mut Self::InitializedParts, options: ReinitializeOptions, ) -> ApiResult<()> { initialized.source.reset()?; - let up_endpoints = match options.endpoints_provider { - ReinitializedUpEndpointsProvider::KeepOriginalUpEndpoints => { - take(&mut initialized.recovered_records.up_endpoints) - } - ReinitializedUpEndpointsProvider::RefreshUpEndpoints => self.up_endpoints(&initialized.params)?, - ReinitializedUpEndpointsProvider::SpecifiedRegionsProvider(regions_provider) => { - let opts = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); - RegionsProviderEndpoints::new(regions_provider) - .get_endpoints(opts)? - .into_owned() - } - }; - initialized.recovered_records = self.create_new_records(&initialized.source, up_endpoints)?; + let up_endpoints = options.get_up_endpoints(self, initialized)?; + initialized.resumed_records = self.create_new_records(&initialized.source, up_endpoints)?; initialized.info = self.call_initialize_parts(&initialized.params, initialized.up_endpoints())?; initialized.progresses = Default::default(); Ok(()) @@ -313,16 +324,12 @@ impl MultiPartsUploader for MultiPartsV2Uploader initialized: &Self::InitializedParts, data_partitioner_provider: &dyn DataPartitionProvider, ) -> ApiResult> { - let data_partitioner_provider = LimitedDataPartitionProvider::new_with_non_zero_threshold( - data_partitioner_provider, - MIN_PART_SIZE, - MAX_PART_SIZE, - ); + let data_partitioner_provider = normalize_data_partitioner_provider(data_partitioner_provider); let total_size = initialized.source.total_size()?; return if let Some(mut reader) = initialized.source.slice(data_partitioner_provider.part_size())? { if let Some(part_size) = NonZeroU64::new(reader.len()?) { let progresses_key = initialized.progresses.add_new_part(part_size.into()); - if let Some(uploaded_part) = _could_recover(initialized, &mut reader, part_size) { + if let Some(uploaded_part) = _could_resume(initialized, &mut reader, part_size) { self.after_part_uploaded(&progresses_key, total_size, Some(&uploaded_part))?; Ok(Some(uploaded_part)) } else { @@ -363,13 +370,13 @@ impl MultiPartsUploader for MultiPartsV2Uploader Ok(None) }; - fn _could_recover( + fn _could_resume( initialized: &MultiPartsV2UploaderInitializedObject>>, data_reader: &mut DataSourceReader, part_size: NonZeroU64, ) -> Option { let offset = data_reader.offset(); - initialized.recovered_records.take(offset).and_then(|record| { + initialized.resumed_records.take(offset).and_then(|record| { if record.size == part_size && record.part_number == data_reader.part_number() && Some(record.base64ed_sha1.as_str()) == sha1_of_sync_reader(data_reader).ok().as_deref() @@ -387,7 +394,7 @@ impl MultiPartsUploader for MultiPartsV2Uploader }) } - fn _upload_part<'a, H: Digest, E: EndpointsProvider + Clone + 'a>( + fn _upload_part<'a, H: Digest + Send + 'static, E: EndpointsProvider + Clone + 'a>( uploader: &'a MultiPartsV2Uploader, mut request: SyncUploadPartRequestBuilder<'a, E>, mut body: DataSourceReader, @@ -427,7 +434,7 @@ impl MultiPartsUploader for MultiPartsV2Uploader part_number, }; initialized - .recovered_records + .resumed_records .persist( &initialized.info, &record, @@ -456,7 +463,7 @@ impl MultiPartsUploader for MultiPartsV2Uploader body, ); - fn _complete_parts<'a, H: Digest, E: EndpointsProvider + Clone + 'a, D: DataSource>( + fn _complete_parts<'a, H: Digest + Send + 'static, E: EndpointsProvider + Clone + 'a, D: DataSource>( uploader: &'a MultiPartsV2Uploader, mut request: SyncCompletePartsRequestBuilder<'a, E>, source: &D, @@ -490,13 +497,13 @@ impl MultiPartsUploader for MultiPartsV2Uploader params: ObjectParams, ) -> BoxFuture> { Box::pin(async move { - let (info, recovered_records) = self - .try_to_async_recover(&source, ¶ms, self.async_up_endpoints(¶ms).await?) + let (info, resumed_records) = self + .async_resume_or_create_records(&source, ¶ms, self.async_get_up_endpoints(¶ms).await?) .await?; let info = if let Some(info) = info { info } else { - self.async_call_initialize_parts(¶ms, &recovered_records.up_endpoints) + self.async_call_initialize_parts(¶ms, &resumed_records.up_endpoints) .await? }; @@ -504,12 +511,38 @@ impl MultiPartsUploader for MultiPartsV2Uploader source: Box::new(source), params, info, - recovered_records, + resumed_records, progresses: Default::default(), }) }) } + #[cfg(feature = "async")] + #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))] + fn try_to_async_resume_parts + 'static>( + &self, + source: D, + params: ObjectParams, + ) -> BoxFuture> { + Box::pin(async move { + if let Some(source_key) = source.source_key().await.ok().flatten() { + self.try_to_async_resume_records(&source_key, ¶ms) + .await + .ok() + .flatten() + .map(|(info, resumed_records)| Self::AsyncInitializedParts { + source: Box::new(source), + params, + info, + resumed_records, + progresses: Default::default(), + }) + } else { + None + } + }) + } + #[cfg(feature = "async")] #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))] fn async_reinitialize_parts<'r>( @@ -519,22 +552,8 @@ impl MultiPartsUploader for MultiPartsV2Uploader ) -> BoxFuture> { Box::pin(async move { initialized.source.reset().await?; - let up_endpoints = match options.endpoints_provider { - ReinitializedUpEndpointsProvider::KeepOriginalUpEndpoints => { - take(&mut initialized.recovered_records.up_endpoints) - } - ReinitializedUpEndpointsProvider::RefreshUpEndpoints => { - self.async_up_endpoints(&initialized.params).await? - } - ReinitializedUpEndpointsProvider::SpecifiedRegionsProvider(regions_provider) => { - let opts = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); - RegionsProviderEndpoints::new(regions_provider) - .async_get_endpoints(opts) - .await? - .into_owned() - } - }; - initialized.recovered_records = self.async_create_new_records(&initialized.source, up_endpoints).await?; + let up_endpoints = options.async_get_up_endpoints(self, initialized).await?; + initialized.resumed_records = self.async_create_new_records(&initialized.source, up_endpoints).await?; initialized.info = self .async_call_initialize_parts(&initialized.params, initialized.up_endpoints()) .await?; @@ -551,16 +570,12 @@ impl MultiPartsUploader for MultiPartsV2Uploader data_partitioner_provider: &'r dyn DataPartitionProvider, ) -> BoxFuture<'r, ApiResult>> { return Box::pin(async move { - let data_partitioner_provider = LimitedDataPartitionProvider::new_with_non_zero_threshold( - data_partitioner_provider, - MIN_PART_SIZE, - MAX_PART_SIZE, - ); + let data_partitioner_provider = normalize_data_partitioner_provider(data_partitioner_provider); let total_size = initialized.source.total_size().await?; if let Some(mut reader) = initialized.source.slice(data_partitioner_provider.part_size()).await? { if let Some(part_size) = NonZeroU64::new(reader.len().await?) { let progresses_key = initialized.progresses.add_new_part(part_size.get()); - if let Some(uploaded_part) = _could_recover(initialized, &mut reader, part_size).await { + if let Some(uploaded_part) = _could_resume(initialized, &mut reader, part_size).await { self.after_part_uploaded(&progresses_key, total_size, Some(&uploaded_part))?; Ok(Some(uploaded_part)) } else { @@ -606,13 +621,13 @@ impl MultiPartsUploader for MultiPartsV2Uploader } }); - async fn _could_recover( + async fn _could_resume( initialized: &MultiPartsV2UploaderInitializedObject>>, data_reader: &mut AsyncDataSourceReader, part_size: NonZeroU64, ) -> Option { let offset = data_reader.offset(); - OptionFuture::from(initialized.recovered_records.take(offset).map(|record| async move { + OptionFuture::from(initialized.resumed_records.take(offset).map(|record| async move { if record.size == part_size && record.part_number == data_reader.part_number() && Some(record.base64ed_sha1.as_str()) == sha1_of_async_reader(data_reader).await.ok().as_deref() @@ -632,7 +647,7 @@ impl MultiPartsUploader for MultiPartsV2Uploader .flatten() } - async fn _upload_part<'a, H: Digest, E: EndpointsProvider + Clone + 'a>( + async fn _upload_part<'a, H: Digest + Send + 'static, E: EndpointsProvider + Clone + 'a>( uploader: &'a MultiPartsV2Uploader, mut request: AsyncUploadPartRequestBuilder<'a, E>, mut body: AsyncDataSourceReader, @@ -672,7 +687,7 @@ impl MultiPartsUploader for MultiPartsV2Uploader part_number, }; initialized - .recovered_records + .resumed_records .async_persist( &initialized.info, &record, @@ -711,7 +726,12 @@ impl MultiPartsUploader for MultiPartsV2Uploader .await }); - async fn _complete_parts<'a, H: Digest, E: EndpointsProvider + Clone + 'a, D: AsyncDataSource>( + async fn _complete_parts< + 'a, + H: Digest + Send + 'static, + E: EndpointsProvider + Clone + 'a, + D: AsyncDataSource, + >( uploader: &'a MultiPartsV2Uploader, mut request: AsyncCompletePartsRequestBuilder<'a, E>, source: &D, @@ -832,29 +852,11 @@ fn may_set_extensions_in_err(err: &mut ResponseError) { } } -impl MultiPartsV2Uploader { - fn storage(&self) -> storage::Client { - self.upload_manager.client().storage() - } - - fn access_key(&self) -> ApiResult { - self.upload_manager.upload_token().access_key() - } - - fn bucket_name(&self) -> ApiResult { - self.upload_manager.upload_token().bucket_name() - } - - #[cfg(feature = "async")] - async fn async_access_key(&self) -> ApiResult { - self.upload_manager.upload_token().async_access_key().await - } - - #[cfg(feature = "async")] - async fn async_bucket_name(&self) -> ApiResult { - self.upload_manager.upload_token().async_bucket_name().await - } +fn normalize_data_partitioner_provider(base: P) -> LimitedDataPartitionProvider

{ + LimitedDataPartitionProvider::new_with_non_zero_threshold(base, MIN_PART_SIZE, MAX_PART_SIZE) +} +impl MultiPartsV2Uploader { fn before_request_call(&self, request: &mut RequestBuilderParts<'_>) -> ApiResult<()> { self.callbacks.before_request(request).map_err(make_callback_error) } @@ -885,58 +887,6 @@ impl MultiPartsV2Uploader { .map_err(make_callback_error) } - fn up_endpoints(&self, params: &ObjectParams) -> ApiResult { - let options = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); - let up_endpoints = if let Some(region_provider) = params.region_provider() { - RegionsProviderEndpoints::new(region_provider) - .get_endpoints(options)? - .into_owned() - } else { - RegionsProviderEndpoints::new(self.get_bucket_region()?) - .get_endpoints(options)? - .into_owned() - }; - Ok(up_endpoints) - } - - #[cfg(feature = "async")] - async fn async_up_endpoints(&self, params: &ObjectParams) -> ApiResult { - let options = EndpointsGetOptions::builder().service_names(&[ServiceName::Up]).build(); - let up_endpoints = if let Some(region_provider) = params.region_provider() { - RegionsProviderEndpoints::new(region_provider) - .async_get_endpoints(options) - .await? - .into_owned() - } else { - RegionsProviderEndpoints::new(self.async_get_bucket_region().await?) - .async_get_endpoints(options) - .await? - .into_owned() - }; - Ok(up_endpoints) - } - - fn get_bucket_region(&self) -> ApiResult { - Ok(self - .upload_manager - .queryer() - .query(self.access_key()?, self.bucket_name()?)) - } - - #[cfg(feature = "async")] - async fn async_get_bucket_region(&self) -> ApiResult { - Ok(self - .upload_manager - .queryer() - .query(self.async_access_key().await?, self.async_bucket_name().await?)) - } - - fn make_upload_token_signer(&self, object_name: Option) -> OwnedUploadTokenProviderOrReferenced<'_> { - self.upload_manager - .upload_token() - .make_upload_token_provider(object_name) - } - fn call_initialize_parts( &self, params: &ObjectParams, @@ -954,7 +904,7 @@ impl MultiPartsV2Uploader { expired_at: UNIX_EPOCH + Duration::from_secs(body.get_expired_at_as_u64()), }); - fn _initialize_parts<'a, H: Digest, E: EndpointsProvider + Clone + 'a>( + fn _initialize_parts<'a, H: Digest + Send + 'static, E: EndpointsProvider + Clone + 'a>( uploader: &'a MultiPartsV2Uploader, mut request: SyncInitPartsRequestBuilder<'a, E>, ) -> ApiResult { @@ -985,7 +935,7 @@ impl MultiPartsV2Uploader { expired_at: UNIX_EPOCH + Duration::from_secs(initialized_parts.get_expired_at_as_u64()), }); - async fn _initialize_parts<'a, H: Digest, E: EndpointsProvider + Clone + 'a>( + async fn _initialize_parts<'a, H: Digest + Send + 'static, E: EndpointsProvider + Clone + 'a>( uploader: &'a MultiPartsV2Uploader, mut request: AsyncInitPartsRequestBuilder<'a, E>, ) -> ApiResult { @@ -996,7 +946,7 @@ impl MultiPartsV2Uploader { } } - fn try_to_recover>( + fn resume_or_create_records>( &self, source: &D, params: &ObjectParams, @@ -1006,61 +956,64 @@ impl MultiPartsV2Uploader { MultiPartsV2ResumableRecorderRecords, )> { let (info, records) = if let Some(source_key) = source.source_key()? { - _try_to_recover(self, &source_key, params).ok().flatten().map_or_else( - || { - ( - None, - _create_new_records(&self.resumable_recorder, &source_key, up_endpoints), - ) - }, - |(info, records)| (Some(info), records), - ) + self.try_to_resume_records(&source_key, params) + .ok() + .flatten() + .map_or_else( + || { + ( + None, + _create_new_records(&self.resumable_recorder, &source_key, up_endpoints), + ) + }, + |(info, records)| (Some(info), records), + ) } else { (None, MultiPartsV2ResumableRecorderRecords::new(up_endpoints)) }; - return Ok((info, records)); - - fn _try_to_recover( - uploader: &MultiPartsV2Uploader, - source_key: &SourceKey, - params: &ObjectParams, - ) -> ApiResult< - Option<( - MultiPartsV2UploaderInitializedResponseInfo, - MultiPartsV2ResumableRecorderRecords, - )>, - > { - let mut medium = uploader.resumable_recorder.open_for_read(source_key)?; - let mut lines = BufReader::new(&mut medium).lines(); - let (info, up_endpoints) = if let Some(line) = lines.next() { - let line = line?; - let mut header: MultiPartsV2ResumableRecorderDeserializableHeader = serde_json::from_str(&line)?; - if !header.is_v2() - || header.expired_at() <= SystemTime::now() - || header.bucket() != uploader.bucket_name()?.as_str() - || header.object() != params.object_name() - { - return Ok(None); - } - let info = MultiPartsV2UploaderInitializedResponseInfo { - upload_id: take(&mut header.upload_id), - expired_at: header.expired_at(), - }; - (info, header.up_endpoints) - } else { + Ok((info, records)) + } + + fn try_to_resume_records( + &self, + source_key: &SourceKey, + params: &ObjectParams, + ) -> ApiResult< + Option<( + MultiPartsV2UploaderInitializedResponseInfo, + MultiPartsV2ResumableRecorderRecords, + )>, + > { + let mut medium = self.resumable_recorder.open_for_read(source_key)?; + let mut lines = BufReader::new(&mut medium).lines(); + let (info, up_endpoints) = if let Some(line) = lines.next() { + let line = line?; + let mut header: MultiPartsV2ResumableRecorderDeserializableHeader = serde_json::from_str(&line)?; + if !header.is_v2() + || header.expired_at() <= SystemTime::now() + || header.bucket() != self.bucket_name()?.as_str() + || header.object() != params.object_name() + { return Ok(None); + } + let info = MultiPartsV2UploaderInitializedResponseInfo { + upload_id: take(&mut header.upload_id), + expired_at: header.expired_at(), }; - let mut records = lines - .map(|line| { - let line = line?; - let record: MultiPartsV2ResumableRecorderRecord = serde_json::from_str(&line)?; - Ok(record) - }) - .collect::>()?; - records.up_endpoints = up_endpoints; - records.set_medium_for_append(uploader.resumable_recorder.open_for_append(source_key)?, true); - Ok(Some((info, records))) - } + (info, header.up_endpoints) + } else { + return Ok(None); + }; + let mut records = lines + .map(|line| { + let line = line?; + let record: MultiPartsV2ResumableRecorderRecord = serde_json::from_str(&line)?; + Ok(record) + }) + .collect::>()?; + records.up_endpoints = up_endpoints; + records.set_medium_for_append(self.resumable_recorder.open_for_append(source_key)?, true); + Ok(Some((info, records))) } fn create_new_records>( @@ -1077,7 +1030,7 @@ impl MultiPartsV2Uploader { } #[cfg(feature = "async")] - async fn try_to_async_recover>( + async fn async_resume_or_create_records>( &self, source: &D, params: &ObjectParams, @@ -1087,7 +1040,12 @@ impl MultiPartsV2Uploader { MultiPartsV2ResumableRecorderRecords, )> { let result = if let Some(source_key) = source.source_key().await? { - if let Some((info, records)) = _try_to_recover(self, &source_key, params).await.ok().flatten() { + if let Some((info, records)) = self + .try_to_async_resume_records(&source_key, params) + .await + .ok() + .flatten() + { (Some(info), records) } else { ( @@ -1098,52 +1056,50 @@ impl MultiPartsV2Uploader { } else { (None, MultiPartsV2ResumableRecorderRecords::new(up_endpoints)) }; - return Ok(result); - - async fn _try_to_recover( - uploader: &MultiPartsV2Uploader, - source_key: &SourceKey, - params: &ObjectParams, - ) -> ApiResult< - Option<( - MultiPartsV2UploaderInitializedResponseInfo, - MultiPartsV2ResumableRecorderRecords, - )>, - > { - let mut medium = uploader.resumable_recorder.open_for_async_read(source_key).await?; - let mut lines = AsyncBufReader::new(&mut medium).lines(); - let (info, up_endpoints) = if let Some(line) = lines.try_next().await? { - let mut header: MultiPartsV2ResumableRecorderDeserializableHeader = serde_json::from_str(&line)?; - if !header.is_v2() - || header.expired_at() <= SystemTime::now() - || header.bucket() != uploader.bucket_name()?.as_str() - || header.object() != params.object_name() - { - return Ok(None); - } - let info = MultiPartsV2UploaderInitializedResponseInfo { - upload_id: take(&mut header.upload_id), - expired_at: header.expired_at(), - }; - (info, header.up_endpoints) - } else { + Ok(result) + } + + #[cfg(feature = "async")] + async fn try_to_async_resume_records( + &self, + source_key: &SourceKey, + params: &ObjectParams, + ) -> ApiResult< + Option<( + MultiPartsV2UploaderInitializedResponseInfo, + MultiPartsV2ResumableRecorderRecords, + )>, + > { + let mut medium = self.resumable_recorder.open_for_async_read(source_key).await?; + let mut lines = AsyncBufReader::new(&mut medium).lines(); + let (info, up_endpoints) = if let Some(line) = lines.try_next().await? { + let mut header: MultiPartsV2ResumableRecorderDeserializableHeader = serde_json::from_str(&line)?; + if !header.is_v2() + || header.expired_at() <= SystemTime::now() + || header.bucket() != self.bucket_name()?.as_str() + || header.object() != params.object_name() + { return Ok(None); + } + let info = MultiPartsV2UploaderInitializedResponseInfo { + upload_id: take(&mut header.upload_id), + expired_at: header.expired_at(), }; - let mut records = lines - .map(|line| { - let line = line?; - let record: MultiPartsV2ResumableRecorderRecord = serde_json::from_str(&line)?; - Ok::<_, ResponseError>(record) - }) - .try_collect::() - .await?; - records.up_endpoints = up_endpoints; - records.set_medium_for_async_append( - uploader.resumable_recorder.open_for_async_append(source_key).await?, - true, - ); - Ok(Some((info, records))) - } + (info, header.up_endpoints) + } else { + return Ok(None); + }; + let mut records = lines + .map(|line| { + let line = line?; + let record: MultiPartsV2ResumableRecorderRecord = serde_json::from_str(&line)?; + Ok::<_, ResponseError>(record) + }) + .try_collect::() + .await?; + records.up_endpoints = up_endpoints; + records.set_medium_for_async_append(self.resumable_recorder.open_for_async_append(source_key).await?, true); + Ok(Some((info, records))) } #[cfg(feature = "async")] @@ -1969,6 +1925,8 @@ mod tests { #[cfg(feature = "async")] #[async_std::test] async fn test_async_multi_parts_v2_upload_with_recovery() -> Result<()> { + use async_std::fs::read_dir as async_read_dir; + env_logger::builder().is_test(true).try_init().ok(); #[derive(Debug)] @@ -2149,7 +2107,7 @@ mod tests { } } - { + let mut initialized_parts = { let uploader = Arc::new(MultiPartsV2Uploader::new( get_upload_manager(FakeHttpCaller2::new(3)), FileSystemResumableRecorder::::new(resuming_files_dir.path()), @@ -2172,13 +2130,114 @@ mod tests { let parts = parts.into_iter().map(|part| part.unwrap()).collect::>(); let body = uploader.async_complete_parts(&initialized_parts, &parts).await?; assert_eq!(body.get("done").unwrap().as_u64().unwrap(), 1u64); + Arc::try_unwrap(initialized_parts).unwrap() + }; + + assert!(async_read_dir(resuming_files_dir.path()).await?.next().await.is_none()); + + #[derive(Debug, Default)] + struct FakeHttpCaller3 { + init_parts_counts: AtomicUsize, + upload_part_counts: AtomicUsize, + complete_parts_counts: AtomicUsize, + } + + impl HttpCaller for FakeHttpCaller3 { + fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult { + unreachable!() + } + + #[cfg(feature = "async")] + fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> { + Box::pin(async move { + let resp_body = if request + .url() + .path() + .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/") + { + let page_number: usize; + scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number); + match size_of_async_reader(request.body_mut()).await.unwrap() { + LAST_BLOCK_SIZE | BLOCK_SIZE => { + self.upload_part_counts.fetch_add(1, Ordering::Relaxed); + } + _ => unimplemented!(), + } + json_to_vec(&json!({ + "etag": format!("==={}===", page_number), + "md5": "fake-md5", + })) + .unwrap() + } else if request.url().path() == "/buckets/fakebucket/objects/~/uploads/fakeuploadid" { + assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 3); + assert_eq!(self.complete_parts_counts.fetch_add(1, Ordering::Relaxed), 0); + let body: CompletePartsRequestBody = { + let mut req_body = Vec::new(); + async_io_copy(request.body_mut(), &mut req_body).await.unwrap(); + serde_json::from_slice(&req_body).unwrap() + }; + body.get_parts().to_part_info_vec().into_iter().fold( + None, + |last_page_number, part_info| { + if let Some(last_page_number) = last_page_number { + assert_eq!(part_info.get_part_number_as_u64(), last_page_number + 1); + assert_eq!( + part_info.get_etag_as_str(), + &format!("==={}===", part_info.get_part_number_as_u64()), + ); + } + Some(part_info.get_part_number_as_u64()) + }, + ); + json_to_vec(&json!({ + "done": 1, + })) + .unwrap() + } else if request.url().path() == "/buckets/fakebucket/objects/~/uploads" { + assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0); + assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0); + assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 0); + json_to_vec(&json!({ + "uploadId": "fakeuploadid", + "expireAt": (SystemTime::now() + Duration::from_secs(3600)).duration_since(UNIX_EPOCH).unwrap().as_secs(), + })) + .unwrap() + } else { + unreachable!() + }; + Ok(AsyncResponse::builder() + .status_code(StatusCode::OK) + .header("x-reqid", HeaderValue::from_static("FakeReqid")) + .body(AsyncResponseBody::from_bytes(resp_body)) + .build()) + }) + } + } + + { + let uploader = Arc::new(MultiPartsV2Uploader::new( + get_upload_manager(FakeHttpCaller3::default()), + FileSystemResumableRecorder::::new(resuming_files_dir.path()), + )); + uploader + .async_reinitialize_parts(&mut initialized_parts, Default::default()) + .await?; + let initialized_parts = Arc::new(initialized_parts); + let tasks = (0..BLOCK_COUNT).map(|_| { + let uploader = uploader.to_owned(); + let initialized_parts = initialized_parts.to_owned(); + spawn_task(async move { + uploader + .async_upload_part(&initialized_parts, &new_data_partitioner_provider(BLOCK_SIZE)) + .await + }) + }); + let parts = join_all(tasks).await.into_iter().collect::>>()?; + let parts = parts.into_iter().map(|part| part.unwrap()).collect::>(); + let body = uploader.async_complete_parts(&initialized_parts, &parts).await?; + assert_eq!(body.get("done").unwrap().as_u64().unwrap(), 1u64); } - assert!(async_std::fs::read_dir(resuming_files_dir.path()) - .await? - .next() - .await - .is_none()); Ok(()) } } diff --git a/upload-manager/src/scheduler/concurrent_multi_parts_uploader_scheduler.rs b/upload-manager/src/scheduler/concurrent_multi_parts_uploader_scheduler.rs index e4c09dd8..45139207 100644 --- a/upload-manager/src/scheduler/concurrent_multi_parts_uploader_scheduler.rs +++ b/upload-manager/src/scheduler/concurrent_multi_parts_uploader_scheduler.rs @@ -1,10 +1,14 @@ use super::{ super::{ - multi_parts_uploader::PartsExpiredError, ConcurrencyProvider, ConcurrencyProviderFeedback, - DataPartitionProvider, DataSource, FixedConcurrencyProvider, FixedDataPartitionProvider, MultiPartsUploader, - ObjectParams, UploadedPart, + multi_parts_uploader::{MultiPartsUploaderExt, PartsExpiredError}, + Concurrency, ConcurrencyProvider, ConcurrencyProviderFeedback, DataPartitionProvider, DataSource, + FixedConcurrencyProvider, FixedDataPartitionProvider, MultiPartsUploader, ObjectParams, ReinitializeOptions, + UploadedPart, + }, + utils::{ + keep_original_region_options, need_to_retry, no_region_tried_error, remove_used_region_from_regions, + specify_region_options, UploadPartsError, UploadResumedPartsError, }, - utils::keep_original_region_options, MultiPartsUploaderScheduler, }; use qiniu_apis::http_client::{ApiResult, ResponseError, ResponseErrorKind}; @@ -16,15 +20,15 @@ use std::{ atomic::{AtomicBool, AtomicU64, Ordering}, Mutex, }, - time::Instant, + time::{Duration, Instant}, }; use tap::Tap; #[cfg(feature = "async")] use { - super::{super::Concurrency, AsyncDataSource}, + super::AsyncDataSource, async_std::task::spawn, - futures::future::{join_all, BoxFuture}, + futures::future::{join_all, BoxFuture, OptionFuture}, std::sync::Arc, }; @@ -150,159 +154,330 @@ impl MultiPartsUploaderScheduler Ok(value), - Err((err, uploaded_size, true, Some(mut initialized))) - if err.extensions().get::().is_some() => - { - if self - .multi_parts_uploader - .reinitialize_parts(&mut initialized, keep_original_region_options()) - .is_ok() - { - let begin_at = Instant::now(); - _upload_after_reinitialize(self, &initialized, &thread_pool).tap_mut(|_| { - elapsed = begin_at.elapsed(); - }) - } else { - Err((err, uploaded_size)) + let mut uploaded_size = Default::default(); + let mut elapsed = Default::default(); + return _upload( + self, + source, + params, + concurrency, + &thread_pool, + &mut elapsed, + &mut uploaded_size, + ) + .tap(|result| { + if let Some(uploaded_size) = NonZeroU64::new(uploaded_size) { + let mut builder = ConcurrencyProviderFeedback::builder(concurrency, uploaded_size, elapsed); + if let Some(err) = result.as_ref().err() { + builder.error(err); } + self.concurrency_provider.feedback(builder.build()) } - Err((err, uploaded_size, _, _)) => Err((err, uploaded_size)), - }; - - let (result, uploaded_size) = match result { - Ok((value, uploaded_size)) => (Ok(value), uploaded_size), - Err((err, uploaded_size)) => (Err(err), uploaded_size), - }; + }); - if let Some(uploaded_size) = NonZeroU64::new(uploaded_size) { - let mut builder = ConcurrencyProviderFeedback::builder(concurrency, uploaded_size, elapsed); - if let Some(err) = result.as_ref().err() { - builder.error(err); + fn _upload( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + concurrency: Concurrency, + thread_pool: &ThreadPool, + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> ApiResult { + match _resume_and_upload( + scheduler, + source.to_owned(), + params.to_owned(), + concurrency, + thread_pool, + elapsed, + uploaded_size, + ) { + None => { + match _try_to_upload_to_all_regions( + scheduler, + source, + params, + None, + concurrency, + thread_pool, + elapsed, + uploaded_size, + ) { + Ok(None) => Err(no_region_tried_error()), + Ok(Some(value)) => Ok(value), + Err(err) => Err(err), + } + } + Some(Err(UploadPartsError { err, .. })) if !need_to_retry(&err) => Err(err), + Some(Err(UploadPartsError { initialized, err })) => { + match _try_to_upload_to_all_regions( + scheduler, + source, + params, + initialized, + concurrency, + thread_pool, + elapsed, + uploaded_size, + ) { + Ok(None) => Err(err), + Ok(Some(value)) => Ok(value), + Err(err) => Err(err), + } + } + Some(Ok(value)) => Ok(value), } - self.concurrency_provider.feedback(builder.build()) } - return result; - type UploadResult = Result< - (Value, u64), - ( - ResponseError, - u64, - bool, - Option<::InitializedParts>, - ), - >; + fn _resume_and_upload( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + concurrency: Concurrency, + thread_pool: &ThreadPool, + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> Option>> { + _upload_resumed_parts( + scheduler, + source, + params, + concurrency, + thread_pool, + elapsed, + uploaded_size, + ) + .map(|result| match result { + Ok(value) => Ok(value), + Err(UploadResumedPartsError { + err, + resumed: true, + initialized: Some(mut initialized), + }) if err.extensions().get::().is_some() => { + match _reinitialize_and_upload_again( + scheduler, + &mut initialized, + keep_original_region_options(), + concurrency, + thread_pool, + elapsed, + uploaded_size, + ) { + Some(Ok(value)) => Ok(value), + Some(Err(err)) => Err(UploadPartsError::new(err, Some(initialized))), + None => Err(UploadPartsError::new(err, Some(initialized))), + } + } + Err(UploadResumedPartsError { err, initialized, .. }) => Err(UploadPartsError::new(err, initialized)), + }) + } - fn _upload( + fn _upload_resumed_parts( scheduler: &ConcurrentMultiPartsUploaderScheduler, source: Box>, params: ObjectParams, + concurrency: Concurrency, thread_pool: &ThreadPool, - ) -> UploadResult { - let initialized = scheduler + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> Option>> { + let begin_at = Instant::now(); + scheduler .multi_parts_uploader - .initialize_parts(source, params) - .map_err(|err| (err, 0, false, None))?; - let parts = Mutex::new(Vec::with_capacity(4)); - let any_error = AtomicBool::new(false); - let uploaded_size = AtomicU64::new(0); - let resumed = AtomicBool::new(false); - thread_pool.scope_fifo(|s| { - s.spawn_fifo(|_| { - while !any_error.load(Ordering::SeqCst) { - match scheduler - .multi_parts_uploader - .upload_part(&initialized, &scheduler.data_partition_provider) - { - Ok(Some(uploaded_part)) => { - if uploaded_part.resumed() { - if !resumed.load(Ordering::Relaxed) { - resumed.store(true, Ordering::Relaxed); - } - } else { - uploaded_size.fetch_add(uploaded_part.size().get(), Ordering::Relaxed); - } - parts.lock().unwrap().push(Ok(uploaded_part)); - } - Ok(None) => { - return; - } - Err(err) => { - parts.lock().unwrap().push(Err(err)); - any_error.store(false, Ordering::SeqCst); - return; - } + .try_to_resume_parts(source, params) + .map(|initialized| { + _upload_after_initialize(scheduler, &initialized, concurrency, thread_pool, uploaded_size) + .map_err(|(err, resumed)| UploadResumedPartsError::new(err, resumed, Some(initialized))) + }) + .tap(|_| { + *elapsed = begin_at.elapsed(); + }) + } + + #[allow(clippy::too_many_arguments)] + fn _try_to_upload_to_all_regions( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + mut initialized: Option, + concurrency: Concurrency, + thread_pool: &ThreadPool, + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> ApiResult> { + let mut regions = scheduler + .multi_parts_uploader + .get_bucket_regions(¶ms) + .map(|r| r.into_regions())?; + if let Some(initialized) = &initialized { + remove_used_region_from_regions(&mut regions, initialized); + } + let mut last_err = None; + for region in regions { + let begin_at = Instant::now(); + let initialized_result = if let Some(mut initialized) = initialized.take() { + scheduler + .multi_parts_uploader + .reinitialize_parts(&mut initialized, specify_region_options(region)) + .map(|_| initialized) + } else { + scheduler + .multi_parts_uploader + .initialize_parts(source.to_owned(), params.to_owned()) + }; + let new_initialized = match initialized_result { + Ok(new_initialized) => { + initialized = Some(new_initialized.to_owned()); + new_initialized + } + Err(err) => { + let to_retry = need_to_retry(&err); + last_err = Some(err); + if to_retry { + continue; + } else { + break; + } + } + }; + let result = + _upload_after_reinitialize(scheduler, &new_initialized, concurrency, thread_pool, uploaded_size); + *elapsed = begin_at.elapsed(); + match result { + Ok(value) => { + return Ok(Some(value)); + } + Err(err) => { + let to_retry = need_to_retry(&err); + last_err = Some(err); + if to_retry { + continue; + } else { + break; } } - }) - }); - let uploaded_size = uploaded_size.into_inner(); - let resumed = resumed.into_inner(); - let parts = match parts.into_inner().unwrap().into_iter().collect::>>() { - Ok(parts) => parts, - Err(err) => { - return Err((err, uploaded_size, resumed, Some(initialized))); } - }; - match scheduler.multi_parts_uploader.complete_parts(&initialized, &parts) { - Ok(value) => Ok((value, uploaded_size)), - Err(err) => Err((err, uploaded_size, resumed, Some(initialized))), } + last_err.map_or(Ok(None), Err) } - type ReinitializeResult = Result<(Value, u64), (ResponseError, u64)>; - - fn _upload_after_reinitialize( + fn _upload_after_initialize( scheduler: &ConcurrentMultiPartsUploaderScheduler, initialized: &M::InitializedParts, + concurrency: Concurrency, thread_pool: &ThreadPool, - ) -> ReinitializeResult { + uploaded_size: &mut u64, + ) -> Result { let parts = Mutex::new(Vec::with_capacity(4)); + let atomic_uploaded_size = AtomicU64::new(0); + let atomic_resumed = AtomicBool::new(false); let any_error = AtomicBool::new(false); - let uploaded_size = AtomicU64::new(0); thread_pool.scope_fifo(|s| { - s.spawn_fifo(|_| { - while !any_error.load(Ordering::SeqCst) { - match scheduler - .multi_parts_uploader - .upload_part(initialized, &scheduler.data_partition_provider) - { - Ok(Some(uploaded_part)) => { - if !uploaded_part.resumed() { - uploaded_size.fetch_add(uploaded_part.size().get(), Ordering::Relaxed); + for _ in 0..concurrency.as_usize() { + s.spawn_fifo(|_| { + while !any_error.load(Ordering::SeqCst) { + match scheduler + .multi_parts_uploader + .upload_part(initialized, &scheduler.data_partition_provider) + { + Ok(Some(uploaded_part)) => { + if uploaded_part.resumed() { + if !atomic_resumed.load(Ordering::Relaxed) { + atomic_resumed.store(true, Ordering::Relaxed); + } + } else { + atomic_uploaded_size.fetch_add(uploaded_part.size().get(), Ordering::Relaxed); + } + parts.lock().unwrap().push(Ok(uploaded_part)); + } + Ok(None) => { + return; + } + Err(err) => { + parts.lock().unwrap().push(Err(err)); + any_error.store(false, Ordering::SeqCst); + return; } - parts.lock().unwrap().push(Ok(uploaded_part)); - } - Ok(None) => { - return; - } - Err(err) => { - parts.lock().unwrap().push(Err(err)); - any_error.store(false, Ordering::SeqCst); - return; } } - } - }) + }); + } }); - let uploaded_size = uploaded_size.into_inner(); + *uploaded_size = atomic_uploaded_size.into_inner(); + let resumed = atomic_resumed.into_inner(); let parts = parts .into_inner() .unwrap() .into_iter() .collect::>>() - .map_err(|err| (err, uploaded_size))?; + .map_err(|err| (err, resumed))?; scheduler .multi_parts_uploader .complete_parts(initialized, &parts) - .map(|value| (value, uploaded_size)) - .map_err(|err| (err, uploaded_size)) + .map_err(|err| (err, resumed)) + } + + fn _reinitialize_and_upload_again( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + initialized: &mut M::InitializedParts, + reinitialize_options: ReinitializeOptions, + concurrency: Concurrency, + thread_pool: &ThreadPool, + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> Option> { + let begin_at = Instant::now(); + scheduler + .multi_parts_uploader + .reinitialize_parts(initialized, reinitialize_options) + .ok() + .map(|_| _upload_after_reinitialize(scheduler, initialized, concurrency, thread_pool, uploaded_size)) + .tap(|_| { + *elapsed = begin_at.elapsed(); + }) + } + + fn _upload_after_reinitialize( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + initialized: &M::InitializedParts, + concurrency: Concurrency, + thread_pool: &ThreadPool, + uploaded_size: &mut u64, + ) -> ApiResult { + let parts = Mutex::new(Vec::with_capacity(4)); + let atomic_uploaded_size = AtomicU64::new(0); + let any_error = AtomicBool::new(false); + thread_pool.scope_fifo(|s| { + for _ in 0..concurrency.as_usize() { + s.spawn_fifo(|_| { + while !any_error.load(Ordering::SeqCst) { + match scheduler + .multi_parts_uploader + .upload_part(initialized, &scheduler.data_partition_provider) + { + Ok(Some(uploaded_part)) => { + if !uploaded_part.resumed() { + atomic_uploaded_size.fetch_add(uploaded_part.size().get(), Ordering::Relaxed); + } + parts.lock().unwrap().push(Ok(uploaded_part)); + } + Ok(None) => { + return; + } + Err(err) => { + parts.lock().unwrap().push(Err(err)); + any_error.store(false, Ordering::SeqCst); + return; + } + } + } + }) + } + }); + *uploaded_size = atomic_uploaded_size.into_inner(); + let parts = parts.into_inner().unwrap().into_iter().collect::>>()?; + scheduler.multi_parts_uploader.complete_parts(initialized, &parts) } } @@ -315,69 +490,230 @@ impl MultiPartsUploaderScheduler BoxFuture> { return Box::pin(async move { let concurrency = self.concurrency_provider.concurrency(); - let begin_at = Instant::now(); - let result = _upload(self, source, params, concurrency).await; - let mut elapsed = begin_at.elapsed(); + let mut uploaded_size = Default::default(); + let mut elapsed = Default::default(); + _upload(self, source, params, concurrency, &mut elapsed, &mut uploaded_size) + .await + .tap(|result| { + if let Some(uploaded_size) = NonZeroU64::new(uploaded_size) { + let mut builder = ConcurrencyProviderFeedback::builder(concurrency, uploaded_size, elapsed); + if let Some(err) = result.as_ref().err() { + builder.error(err); + } + self.concurrency_provider.feedback(builder.build()) + } + }) + }); - let result = match result { - Ok(value) => Ok(value), - Err((err, uploaded_size, true, Some(mut initialized))) - if err.extensions().get::().is_some() => - { - if self - .multi_parts_uploader - .async_reinitialize_parts(&mut initialized, keep_original_region_options()) - .await - .is_ok() + async fn _upload( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + concurrency: Concurrency, + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> ApiResult { + match _resume_and_upload( + scheduler, + source.to_owned(), + params.to_owned(), + concurrency, + elapsed, + uploaded_size, + ) + .await + { + None => { + match _try_to_upload_to_all_regions( + scheduler, + source, + params, + None, + concurrency, + elapsed, + uploaded_size, + ) + .await { - let begin_at = Instant::now(); - _upload_after_reinitialize(self, initialized, concurrency) - .await - .tap_mut(|_| { - elapsed = begin_at.elapsed(); - }) - } else { - Err((err, uploaded_size)) + Ok(None) => Err(no_region_tried_error()), + Ok(Some(value)) => Ok(value), + Err(err) => Err(err), } } - Err((err, uploaded_size, _, _)) => Err((err, uploaded_size)), - }; - let (result, uploaded_size) = match result { - Ok((value, uploaded_size)) => (Ok(value), uploaded_size), - Err((err, uploaded_size)) => (Err(err), uploaded_size), - }; - if let Some(uploaded_size) = NonZeroU64::new(uploaded_size) { - let mut builder = ConcurrencyProviderFeedback::builder(concurrency, uploaded_size, elapsed); - if let Some(err) = result.as_ref().err() { - builder.error(err); + Some(Err(UploadPartsError { err, .. })) if !need_to_retry(&err) => Err(err), + Some(Err(UploadPartsError { initialized, err })) => { + match _try_to_upload_to_all_regions( + scheduler, + source, + params, + initialized, + concurrency, + elapsed, + uploaded_size, + ) + .await + { + Ok(None) => Err(err), + Ok(Some(value)) => Ok(value), + Err(err) => Err(err), + } } - self.concurrency_provider.feedback(builder.build()) + Some(Ok(value)) => Ok(value), } - result - }); + } - async fn _upload( + async fn _resume_and_upload( scheduler: &ConcurrentMultiPartsUploaderScheduler, source: Box>, params: ObjectParams, concurrency: Concurrency, - ) -> Result<(Value, u64), (ResponseError, u64, bool, Option)> { - let initialized = Arc::new( + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> Option>> { + OptionFuture::from( + _upload_resumed_parts(scheduler, source, params, concurrency, elapsed, uploaded_size) + .await + .map(|result| async move { + match result { + Ok(value) => Ok(value), + Err(UploadResumedPartsError { + err, + resumed: true, + initialized: Some(mut initialized), + }) if err.extensions().get::().is_some() => { + match _reinitialize_and_upload_again( + scheduler, + &mut initialized, + keep_original_region_options(), + concurrency, + elapsed, + uploaded_size, + ) + .await + { + Some(Ok(value)) => Ok(value), + Some(Err(err)) => Err(UploadPartsError::new(err, Some(initialized))), + None => Err(UploadPartsError::new(err, Some(initialized))), + } + } + Err(UploadResumedPartsError { err, initialized, .. }) => { + Err(UploadPartsError::new(err, initialized)) + } + } + }), + ) + .await + } + + async fn _upload_resumed_parts( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + concurrency: Concurrency, + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> Option>> { + let begin_at = Instant::now(); + OptionFuture::from( scheduler .multi_parts_uploader - .async_initialize_parts(source, params) + .try_to_async_resume_parts(source, params) .await - .map_err(|err| (err, 0, false, None))?, - ); - let uploaded_size = Arc::new(AtomicU64::new(0)); - let resumed = Arc::new(AtomicBool::new(false)); + .map(|initialized| async move { + _upload_after_initialize(scheduler, initialized.to_owned(), concurrency, uploaded_size) + .await + .map_err(|(err, resumed)| UploadResumedPartsError::new(err, resumed, Some(initialized))) + }), + ) + .await + .tap(|_| { + *elapsed = begin_at.elapsed(); + }) + } + + async fn _try_to_upload_to_all_regions( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + mut initialized: Option, + concurrency: Concurrency, + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> ApiResult> { + let mut regions = scheduler + .multi_parts_uploader + .async_get_bucket_regions(¶ms) + .await + .map(|r| r.into_regions())?; + if let Some(initialized) = &initialized { + remove_used_region_from_regions(&mut regions, initialized); + } + let mut last_err = None; + for region in regions { + let begin_at = Instant::now(); + let initialized_result = if let Some(mut initialized) = initialized.take() { + scheduler + .multi_parts_uploader + .async_reinitialize_parts(&mut initialized, specify_region_options(region)) + .await + .map(|_| initialized) + } else { + scheduler + .multi_parts_uploader + .async_initialize_parts(source.to_owned(), params.to_owned()) + .await + }; + let new_initialized = match initialized_result { + Ok(new_initialized) => { + initialized = Some(new_initialized.to_owned()); + new_initialized + } + Err(err) => { + let to_retry = need_to_retry(&err); + last_err = Some(err); + if to_retry { + continue; + } else { + break; + } + } + }; + let result = _upload_after_reinitialize(scheduler, new_initialized, concurrency, uploaded_size).await; + *elapsed = begin_at.elapsed(); + match result { + Ok(value) => { + return Ok(Some(value)); + } + Err(err) => { + let to_retry = need_to_retry(&err); + last_err = Some(err); + if to_retry { + continue; + } else { + break; + } + } + } + } + last_err.map_or(Ok(None), Err) + } + + async fn _upload_after_initialize( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + initialized: M::AsyncInitializedParts, + concurrency: Concurrency, + uploaded_size: &mut u64, + ) -> Result { + let initialized = Arc::new(initialized); + let atomic_uploaded_size = Arc::new(AtomicU64::new(0)); + let atomic_resumed = Arc::new(AtomicBool::new(false)); let any_error = Arc::new(AtomicBool::new(false)); let results = join_all((0..concurrency.as_usize()).map(|_| { let scheduler = scheduler.to_owned(); let initialized = initialized.to_owned(); let any_error = any_error.to_owned(); - let uploaded_size = uploaded_size.to_owned(); - let resumed = resumed.to_owned(); + let atomic_uploaded_size = atomic_uploaded_size.to_owned(); + let atomic_resumed = atomic_resumed.to_owned(); spawn(async move { let mut parts = Vec::with_capacity(4); while !any_error.load(Ordering::SeqCst) { @@ -388,11 +724,11 @@ impl MultiPartsUploaderScheduler { if uploaded_part.resumed() { - if !resumed.load(Ordering::Relaxed) { - resumed.store(true, Ordering::Relaxed); + if !atomic_resumed.load(Ordering::Relaxed) { + atomic_resumed.store(true, Ordering::Relaxed); } } else { - uploaded_size.fetch_add(uploaded_part.size().get(), Ordering::Relaxed); + atomic_uploaded_size.fetch_add(uploaded_part.size().get(), Ordering::Relaxed); } parts.push(Ok(uploaded_part)); } @@ -411,45 +747,60 @@ impl MultiPartsUploaderScheduler parts.push(uploaded_part), - Err(err) => { - return Err((err, uploaded_size, resumed, Some(initialized))); - } - } - } - } - match scheduler + *uploaded_size = Arc::try_unwrap(atomic_uploaded_size).unwrap().into_inner(); + let resumed = Arc::try_unwrap(atomic_resumed).unwrap().into_inner(); + let parts = results + .into_iter() + .flatten() + .collect::>>() + .map_err(|err| (err, resumed))?; + scheduler .multi_parts_uploader .async_complete_parts(&initialized, &parts) .await - { - Ok(value) => Ok((value, uploaded_size)), - Err(err) => Err((err, uploaded_size, resumed, Some(initialized))), - } + .map_err(|err| (err, resumed)) + } + + async fn _reinitialize_and_upload_again( + scheduler: &ConcurrentMultiPartsUploaderScheduler, + initialized: &mut M::AsyncInitializedParts, + reinitialize_options: ReinitializeOptions, + concurrency: Concurrency, + elapsed: &mut Duration, + uploaded_size: &mut u64, + ) -> Option> { + let begin_at = Instant::now(); + OptionFuture::from( + scheduler + .multi_parts_uploader + .async_reinitialize_parts(initialized, reinitialize_options) + .await + .ok() + .map(|_| _upload_after_reinitialize(scheduler, initialized.to_owned(), concurrency, uploaded_size)), + ) + .await + .tap(|_| { + *elapsed = begin_at.elapsed(); + }) } async fn _upload_after_reinitialize( scheduler: &ConcurrentMultiPartsUploaderScheduler, initialized: M::AsyncInitializedParts, concurrency: Concurrency, - ) -> Result<(Value, u64), (ResponseError, u64)> { + uploaded_size: &mut u64, + ) -> ApiResult { let initialized = Arc::new(initialized); - let uploaded_size = Arc::new(AtomicU64::new(0)); - let any_error = Arc::new(AtomicBool::new(false)); + let atomic_uploaded_size = Arc::new(AtomicU64::new(0)); + let atomic_any_error = Arc::new(AtomicBool::new(false)); let results = join_all((0..concurrency.as_usize()).map(|_| { let scheduler = scheduler.to_owned(); let initialized = initialized.to_owned(); - let any_error = any_error.to_owned(); - let uploaded_size = uploaded_size.to_owned(); + let atomic_any_error = atomic_any_error.to_owned(); + let atomic_uploaded_size = atomic_uploaded_size.to_owned(); spawn(async move { let mut parts = Vec::with_capacity(4); - while !any_error.load(Ordering::SeqCst) { + while !atomic_any_error.load(Ordering::SeqCst) { match scheduler .multi_parts_uploader .async_upload_part(&initialized, &scheduler.data_partition_provider) @@ -457,7 +808,7 @@ impl MultiPartsUploaderScheduler { if !uploaded_part.resumed() { - uploaded_size.fetch_add(uploaded_part.size().get(), Ordering::Relaxed); + atomic_uploaded_size.fetch_add(uploaded_part.size().get(), Ordering::Relaxed); } parts.push(Ok(uploaded_part)); } @@ -466,7 +817,7 @@ impl MultiPartsUploaderScheduler { parts.push(Err(err)); - any_error.store(true, Ordering::SeqCst); + atomic_any_error.store(true, Ordering::SeqCst); break; } } @@ -476,19 +827,12 @@ impl MultiPartsUploaderScheduler>>()?; scheduler .multi_parts_uploader .async_complete_parts(&initialized, &parts) .await - .map(|value| (value, uploaded_size)) - .map_err(|err| (err, uploaded_size)) } } } diff --git a/upload-manager/src/scheduler/serial_multi_parts_uploader_scheduler.rs b/upload-manager/src/scheduler/serial_multi_parts_uploader_scheduler.rs index 96bdf0e3..9defc0ec 100644 --- a/upload-manager/src/scheduler/serial_multi_parts_uploader_scheduler.rs +++ b/upload-manager/src/scheduler/serial_multi_parts_uploader_scheduler.rs @@ -1,9 +1,13 @@ use super::{ super::{ - multi_parts_uploader::PartsExpiredError, ConcurrencyProvider, DataPartitionProvider, DataSource, - FixedDataPartitionProvider, MultiPartsUploader, ObjectParams, UploadedPart, + multi_parts_uploader::{MultiPartsUploaderExt, PartsExpiredError}, + ConcurrencyProvider, DataPartitionProvider, DataSource, FixedDataPartitionProvider, MultiPartsUploader, + ObjectParams, ReinitializeOptions, UploadedPart, + }, + utils::{ + keep_original_region_options, need_to_retry, no_region_tried_error, remove_used_region_from_regions, + specify_region_options, UploadPartsError, UploadResumedPartsError, }, - utils::keep_original_region_options, MultiPartsUploaderScheduler, }; use qiniu_apis::http_client::{ApiResult, ResponseError}; @@ -11,7 +15,10 @@ use serde_json::Value; use std::num::NonZeroU64; #[cfg(feature = "async")] -use {super::AsyncDataSource, futures::future::BoxFuture}; +use { + super::AsyncDataSource, + futures::future::{BoxFuture, OptionFuture}, +}; /// 串行分片上传调度器 /// @@ -112,37 +119,127 @@ impl MultiPartsUploaderScheduler for Se } fn upload(&self, source: Box>, params: ObjectParams) -> ApiResult { - return match _upload(self, source, params) { - Ok(value) => Ok(value), - Err((err, true, Some(mut initialized))) if err.extensions().get::().is_some() => { - if self - .multi_parts_uploader - .reinitialize_parts(&mut initialized, keep_original_region_options()) - .is_ok() - { - _upload_after_reinitialize(self, &initialized) - } else { - Err(err) + return match _resume_and_upload(self, source.to_owned(), params.to_owned()) { + None => match _try_to_upload_to_all_regions(self, source, params, None) { + Ok(None) => Err(no_region_tried_error()), + Ok(Some(value)) => Ok(value), + Err(err) => Err(err), + }, + Some(Err(UploadPartsError { err, .. })) if !need_to_retry(&err) => Err(err), + Some(Err(UploadPartsError { initialized, err })) => { + match _try_to_upload_to_all_regions(self, source, params, initialized) { + Ok(None) => Err(err), + Ok(Some(value)) => Ok(value), + Err(err) => Err(err), } } - Err((err, _, _)) => Err(err), + Some(Ok(value)) => Ok(value), }; - fn _upload( + fn _resume_and_upload( scheduler: &SerialMultiPartsUploaderScheduler, source: Box>, params: ObjectParams, - ) -> Result)> { - let initialized = scheduler + ) -> Option>> { + _upload_resumed_parts(scheduler, source, params).map(|result| match result { + Ok(value) => Ok(value), + Err(UploadResumedPartsError { + err, + resumed: true, + initialized: Some(mut initialized), + }) if err.extensions().get::().is_some() => { + match _reinitialize_and_upload_again(scheduler, &mut initialized, keep_original_region_options()) { + Some(Ok(value)) => Ok(value), + Some(Err(err)) => Err(UploadPartsError::new(err, Some(initialized))), + None => Err(UploadPartsError::new(err, Some(initialized))), + } + } + Err(UploadResumedPartsError { err, initialized, .. }) => Err(UploadPartsError::new(err, initialized)), + }) + } + + fn _upload_resumed_parts( + scheduler: &SerialMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + ) -> Option>> { + scheduler .multi_parts_uploader - .initialize_parts(source, params) - .map_err(|err| (err, false, None))?; + .try_to_resume_parts(source, params) + .map(|initialized| { + _upload_after_initialize(scheduler, &initialized) + .map_err(|(err, resumed)| UploadResumedPartsError::new(err, resumed, Some(initialized))) + }) + } + + fn _try_to_upload_to_all_regions( + scheduler: &SerialMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + mut initialized: Option, + ) -> ApiResult> { + let mut regions = scheduler + .multi_parts_uploader + .get_bucket_regions(¶ms) + .map(|r| r.into_regions())?; + if let Some(initialized) = &initialized { + remove_used_region_from_regions(&mut regions, initialized); + } + let mut last_err = None; + for region in regions { + let initialized_result = if let Some(mut initialized) = initialized.take() { + scheduler + .multi_parts_uploader + .reinitialize_parts(&mut initialized, specify_region_options(region)) + .map(|_| initialized) + } else { + scheduler + .multi_parts_uploader + .initialize_parts(source.to_owned(), params.to_owned()) + }; + let new_initialized = match initialized_result { + Ok(new_initialized) => { + initialized = Some(new_initialized.to_owned()); + new_initialized + } + Err(err) => { + let to_retry = need_to_retry(&err); + last_err = Some(err); + if to_retry { + continue; + } else { + break; + } + } + }; + match _upload_after_reinitialize(scheduler, &new_initialized) { + Ok(value) => { + return Ok(Some(value)); + } + Err(err) => { + let to_retry = need_to_retry(&err); + last_err = Some(err); + if to_retry { + continue; + } else { + break; + } + } + } + } + last_err.map_or(Ok(None), Err) + } + + fn _upload_after_initialize( + scheduler: &SerialMultiPartsUploaderScheduler, + initialized: &M::InitializedParts, + ) -> Result { let mut parts = Vec::with_capacity(4); let mut resumed = false; loop { match scheduler .multi_parts_uploader - .upload_part(&initialized, &scheduler.data_partition_provider) + .upload_part(initialized, &scheduler.data_partition_provider) { Ok(Some(uploaded_part)) => { if uploaded_part.resumed() { @@ -151,13 +248,25 @@ impl MultiPartsUploaderScheduler for Se parts.push(uploaded_part); } Ok(None) => break, - Err(err) => return Err((err, resumed, Some(initialized))), + Err(err) => return Err((err, resumed)), } } - match scheduler.multi_parts_uploader.complete_parts(&initialized, &parts) { - Ok(value) => Ok(value), - Err(err) => Err((err, resumed, Some(initialized))), - } + scheduler + .multi_parts_uploader + .complete_parts(initialized, &parts) + .map_err(|err| (err, resumed)) + } + + fn _reinitialize_and_upload_again( + scheduler: &SerialMultiPartsUploaderScheduler, + initialized: &mut M::InitializedParts, + reinitialize_options: ReinitializeOptions, + ) -> Option> { + scheduler + .multi_parts_uploader + .reinitialize_parts(initialized, reinitialize_options) + .ok() + .map(|_| _upload_after_reinitialize(scheduler, initialized)) } fn _upload_after_reinitialize( @@ -183,40 +292,151 @@ impl MultiPartsUploaderScheduler for Se params: ObjectParams, ) -> BoxFuture> { return Box::pin(async move { - match _upload(self, source, params).await { - Ok(value) => Ok(value), - Err((err, true, Some(mut initialized))) if err.extensions().get::().is_some() => { - if self - .multi_parts_uploader - .async_reinitialize_parts(&mut initialized, keep_original_region_options()) - .await - .is_ok() - { - _upload_after_reinitialize(self, &initialized).await - } else { - Err(err) + match _resume_and_upload(self, source.to_owned(), params.to_owned()).await { + None => match _try_to_upload_to_all_regions(self, source, params, None).await { + Ok(None) => Err(no_region_tried_error()), + Ok(Some(value)) => Ok(value), + Err(err) => Err(err), + }, + Some(Err(UploadPartsError { err, .. })) if !need_to_retry(&err) => Err(err), + Some(Err(UploadPartsError { initialized, err })) => { + match _try_to_upload_to_all_regions(self, source, params, initialized).await { + Ok(None) => Err(err), + Ok(Some(value)) => Ok(value), + Err(err) => Err(err), } } - Err((err, _, _)) => Err(err), + Some(Ok(value)) => Ok(value), } }); - async fn _upload( + async fn _resume_and_upload( + scheduler: &SerialMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + ) -> Option>> { + OptionFuture::from( + _upload_resumed_parts(scheduler, source, params) + .await + .map(|result| async move { + match result { + Ok(value) => Ok(value), + Err(UploadResumedPartsError { + err, + resumed: true, + initialized: Some(mut initialized), + }) if err.extensions().get::().is_some() => { + match _reinitialize_and_upload_again( + scheduler, + &mut initialized, + keep_original_region_options(), + ) + .await + { + Some(Ok(value)) => Ok(value), + Some(Err(err)) => Err(UploadPartsError::new(err, Some(initialized))), + None => Err(UploadPartsError::new(err, Some(initialized))), + } + } + Err(UploadResumedPartsError { err, initialized, .. }) => { + Err(UploadPartsError::new(err, initialized)) + } + } + }), + ) + .await + } + + async fn _upload_resumed_parts( + scheduler: &SerialMultiPartsUploaderScheduler, + source: Box>, + params: ObjectParams, + ) -> Option>> { + OptionFuture::from( + scheduler + .multi_parts_uploader + .try_to_async_resume_parts(source, params) + .await + .map(|initialized| async move { + _upload_after_initialize(scheduler, &initialized) + .await + .map_err(|(err, resumed)| UploadResumedPartsError::new(err, resumed, Some(initialized))) + }), + ) + .await + } + + async fn _try_to_upload_to_all_regions( scheduler: &SerialMultiPartsUploaderScheduler, source: Box>, params: ObjectParams, - ) -> Result)> { - let initialized = scheduler + mut initialized: Option, + ) -> ApiResult> { + let mut regions = scheduler .multi_parts_uploader - .async_initialize_parts(source, params) + .async_get_bucket_regions(¶ms) .await - .map_err(|err| (err, false, None))?; + .map(|r| r.into_regions())?; + if let Some(initialized) = &initialized { + remove_used_region_from_regions(&mut regions, initialized); + } + let mut last_err = None; + for region in regions { + let initialized_result = if let Some(mut initialized) = initialized.take() { + scheduler + .multi_parts_uploader + .async_reinitialize_parts(&mut initialized, specify_region_options(region)) + .await + .map(|_| initialized) + } else { + scheduler + .multi_parts_uploader + .async_initialize_parts(source.to_owned(), params.to_owned()) + .await + }; + let new_initialized = match initialized_result { + Ok(new_initialized) => { + initialized = Some(new_initialized.to_owned()); + new_initialized + } + Err(err) => { + let to_retry = need_to_retry(&err); + last_err = Some(err); + if to_retry { + continue; + } else { + break; + } + } + }; + match _upload_after_reinitialize(scheduler, &new_initialized).await { + Ok(value) => { + return Ok(Some(value)); + } + Err(err) => { + let to_retry = need_to_retry(&err); + last_err = Some(err); + if to_retry { + continue; + } else { + break; + } + } + } + } + last_err.map_or(Ok(None), Err) + } + + async fn _upload_after_initialize( + scheduler: &SerialMultiPartsUploaderScheduler, + initialized: &M::AsyncInitializedParts, + ) -> Result { let mut parts = Vec::with_capacity(4); let mut resumed = false; loop { match scheduler .multi_parts_uploader - .async_upload_part(&initialized, &scheduler.data_partition_provider) + .async_upload_part(initialized, &scheduler.data_partition_provider) .await { Ok(Some(uploaded_part)) => { @@ -226,19 +446,30 @@ impl MultiPartsUploaderScheduler for Se parts.push(uploaded_part); } Ok(None) => break, - Err(err) => { - return Err((err, resumed, Some(initialized))); - } + Err(err) => return Err((err, resumed)), } } - match scheduler + scheduler .multi_parts_uploader - .async_complete_parts(&initialized, &parts) + .async_complete_parts(initialized, &parts) .await - { - Ok(value) => Ok(value), - Err(err) => Err((err, resumed, Some(initialized))), - } + .map_err(|err| (err, resumed)) + } + + async fn _reinitialize_and_upload_again( + scheduler: &SerialMultiPartsUploaderScheduler, + initialized: &mut M::AsyncInitializedParts, + reinitialize_options: ReinitializeOptions, + ) -> Option> { + OptionFuture::from( + scheduler + .multi_parts_uploader + .async_reinitialize_parts(initialized, reinitialize_options) + .await + .ok() + .map(|_| _upload_after_reinitialize(scheduler, initialized)), + ) + .await } async fn _upload_after_reinitialize( @@ -283,7 +514,10 @@ mod tests { AsyncRequest, AsyncReset, AsyncResponse, AsyncResponseResult, HeaderValue, HttpCaller, StatusCode, SyncRequest, SyncResponseResult, }, - http_client::{AsyncResponseBody, DirectChooser, HttpClient, NeverRetrier, Region, NO_BACKOFF}, + http_client::{ + AsyncResponseBody, DirectChooser, ErrorRetrier, HttpClient, LimitedRetrier, NeverRetrier, Region, + RequestRetrier, StaticRegionsProvider, NO_BACKOFF, + }, }; use qiniu_utils::base64::urlsafe as urlsafe_base64; use rand::{thread_rng, RngCore}; @@ -802,6 +1036,129 @@ mod tests { assert_eq!(caller.upload_part_counts.into_inner(), 1); } + #[derive(Debug, Default)] + struct FakeHttpCaller4 { + init_parts_counts: AtomicUsize, + upload_part_counts: AtomicUsize, + complete_parts_counts: AtomicUsize, + } + + impl HttpCaller for FakeHttpCaller4 { + fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult { + unreachable!() + } + + #[cfg(feature = "async")] + fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> { + Box::pin(async move { + if request.url().path() == "/buckets/fakebucket/objects/~/uploads" { + if request.url().host() == Some("fakeup.example.com") { + assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0); + assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0); + assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 0); + } else { + assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 1); + assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 1); + assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 1); + } + let resp_body = json_to_vec(&json!({ + "uploadId": "fakeuploadid", + "expireAt": (SystemTime::now() + Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(), + })) + .unwrap(); + Ok(AsyncResponse::builder() + .status_code(StatusCode::OK) + .header("x-reqid", HeaderValue::from_static("FakeReqid")) + .body(AsyncResponseBody::from_bytes(resp_body)) + .build()) + } else if request + .url() + .path() + .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/") + { + let page_number: usize; + scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number); + assert_eq!(page_number, 1); + let body_len = size_of_async_reader(request.body_mut()).await.unwrap(); + assert_eq!(body_len, BLOCK_SIZE); + if request.url().host() == Some("fakeup.example.com") { + assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 1); + assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0); + assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 0); + } else { + assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 2); + assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 1); + assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 1); + } + let resp_body = json_to_vec(&json!({ + "etag": format!("==={}===", page_number), + "md5": "fake-md5", + })) + .unwrap(); + Ok(AsyncResponse::builder() + .status_code(StatusCode::OK) + .header("x-reqid", HeaderValue::from_static("FakeReqid")) + .body(AsyncResponseBody::from_bytes(resp_body)) + .build()) + } else if request + .url() + .path() + .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid") + { + if request.url().host() == Some("fakeup.example.com") { + assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 1); + assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 1); + assert_eq!(self.complete_parts_counts.fetch_add(1, Ordering::Relaxed), 0); + let resp_body = json_to_vec(&json!({ + "error": "test error", + })) + .unwrap(); + Ok(AsyncResponse::builder() + .status_code(StatusCode::from_u16(599).unwrap()) + .header("x-reqid", HeaderValue::from_static("FakeReqid")) + .body(AsyncResponseBody::from_bytes(resp_body)) + .build()) + } else { + assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 2); + assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 2); + assert_eq!(self.complete_parts_counts.fetch_add(1, Ordering::Relaxed), 1); + let resp_body = json_to_vec(&json!({ + "ok": 1, + })) + .unwrap(); + Ok(AsyncResponse::builder() + .status_code(StatusCode::OK) + .header("x-reqid", HeaderValue::from_static("FakeReqid")) + .body(AsyncResponseBody::from_bytes(resp_body)) + .build()) + } + } else { + unreachable!() + } + }) + } + } + + { + let caller = Arc::new(FakeHttpCaller4::default()); + { + let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new( + get_upload_manager_with_retrier(caller.to_owned(), LimitedRetrier::new(ErrorRetrier, 0)), + FileSystemResumableRecorder::::new(resuming_files_dir.path()), + )); + let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str())); + let params = ObjectParams::builder() + .region_provider(double_up_domain_region()) + .build(); + let body = uploader.async_upload(file_source, params).await.unwrap(); + assert_eq!(body.get("ok").unwrap().as_i64(), Some(1)); + } + let caller = Arc::try_unwrap(caller).unwrap(); + assert_eq!(caller.init_parts_counts.into_inner(), 2); + assert_eq!(caller.upload_part_counts.into_inner(), 2); + assert_eq!(caller.complete_parts_counts.into_inner(), 2); + } + Ok(()) } @@ -818,6 +1175,13 @@ mod tests { } fn get_upload_manager(caller: impl HttpCaller + 'static) -> UploadManager { + get_upload_manager_with_retrier(caller, NeverRetrier) + } + + fn get_upload_manager_with_retrier( + caller: impl HttpCaller + 'static, + retrier: impl RequestRetrier + 'static, + ) -> UploadManager { UploadManager::builder(UploadTokenSigner::new_credential_provider( get_credential(), "fakebucket", @@ -826,7 +1190,7 @@ mod tests { .http_client( HttpClient::builder(caller) .chooser(DirectChooser) - .request_retrier(NeverRetrier) + .request_retrier(retrier) .backoff(NO_BACKOFF) .build(), ) @@ -843,6 +1207,16 @@ mod tests { .build() } + fn double_up_domain_region() -> StaticRegionsProvider { + let mut provider = StaticRegionsProvider::new(single_up_domain_region()); + provider.append( + Region::builder("chaotic2") + .add_up_preferred_endpoint(("fakeup.example2.com".to_owned(), 8080).into()) + .build(), + ); + provider + } + fn random_file_path(size: u64) -> IoResult { let mut tempfile = TempfileBuilder::new().tempfile()?; let rng = Box::new(thread_rng()) as Box; diff --git a/upload-manager/src/scheduler/utils.rs b/upload-manager/src/scheduler/utils.rs index 47d53a2d..39ecaba9 100644 --- a/upload-manager/src/scheduler/utils.rs +++ b/upload-manager/src/scheduler/utils.rs @@ -1,5 +1,57 @@ -use super::super::ReinitializeOptions; +use super::super::{InitializedParts, ReinitializeOptions}; +use qiniu_apis::http_client::{Region, RegionsProvider, ResponseError, ResponseErrorKind, RetryDecision}; pub(super) fn keep_original_region_options() -> ReinitializeOptions { ReinitializeOptions::builder().keep_original_region().build() } + +pub(super) fn specify_region_options(regions_provider: impl RegionsProvider + 'static) -> ReinitializeOptions { + ReinitializeOptions::builder() + .regions_provider(regions_provider) + .build() +} + +pub(super) fn need_to_retry(err: &ResponseError) -> bool { + matches!( + err.retry_decision(), + Some(RetryDecision::TryNextServer | RetryDecision::RetryRequest | RetryDecision::Throttled) + ) +} + +pub(super) fn no_region_tried_error() -> ResponseError { + ResponseError::new_with_msg(ResponseErrorKind::NoTry, "None region is tried") +} + +pub(super) fn remove_used_region_from_regions(regions: &mut Vec, initialized: &I) { + if let Some(found_idx) = regions.iter().position(|r| r.up().similar(initialized.up_endpoints())) { + regions.remove(found_idx); + } +} + +#[derive(Debug)] +pub(super) struct UploadResumedPartsError { + pub(super) err: ResponseError, + pub(super) resumed: bool, + pub(super) initialized: Option, +} + +impl UploadResumedPartsError { + pub(super) fn new(err: ResponseError, resumed: bool, initialized: Option) -> Self { + Self { + err, + resumed, + initialized, + } + } +} + +pub(super) struct UploadPartsError { + pub(super) err: ResponseError, + pub(super) initialized: Option, +} + +impl UploadPartsError { + pub(super) fn new(err: ResponseError, initialized: Option) -> Self { + Self { err, initialized } + } +} diff --git a/upload-token/Cargo.toml b/upload-token/Cargo.toml index 5b7ddf1d..ef365aaf 100644 --- a/upload-token/Cargo.toml +++ b/upload-token/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-upload-token" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0" @@ -24,8 +24,8 @@ serde = { version = "1.0.130", features = ["derive"] } auto_impl = "1.0.0" futures = { version = "0.3.16", optional = true } -qiniu-credential = { version = "0.2.0", path = "../credential" } -qiniu-utils = { version = "0.2.0", path = "../utils" } +qiniu-credential = { version = "0.2.1", path = "../credential" } +qiniu-utils = { version = "0.2.1", path = "../utils" } [dev-dependencies] async-std = { version = "1.6.3", features = ["attributes"] } diff --git a/upload-token/README.md b/upload-token/README.md index b46ad38d..e263130e 100644 --- a/upload-token/README.md +++ b/upload-token/README.md @@ -24,14 +24,14 @@ ```toml [dependencies] -qiniu-upload-token = "0.2.0" +qiniu-upload-token = "0.2.1" ``` ### 启用异步接口 ```toml [dependencies] -qiniu-upload-token = { version = "0.2.0", features = ["async"] } +qiniu-upload-token = { version = "0.2.1", features = ["async"] } ``` ## 代码示例 diff --git a/upload-token/src/upload_policy.rs b/upload-token/src/upload_policy.rs index 32641db2..a1573540 100644 --- a/upload-token/src/upload_policy.rs +++ b/upload-token/src/upload_policy.rs @@ -863,7 +863,7 @@ mod tests { fn test_build_upload_policy_with_callback() -> Result<()> { let policy = UploadPolicyBuilder::new_policy_for_bucket("test_bucket", Duration::from_secs(3600)) .callback( - &["https://1.1.1.1", "https://2.2.2.2", "https://3.3.3.3"], + ["https://1.1.1.1", "https://2.2.2.2", "https://3.3.3.3"], "www.qiniu.com", "a=b&c=d", "", @@ -890,7 +890,7 @@ mod tests { fn test_build_upload_policy_with_callback_body_with_body_type() -> Result<()> { let policy = UploadPolicyBuilder::new_policy_for_bucket("test_bucket", Duration::from_secs(3600)) .callback( - &["https://1.1.1.1", "https://2.2.2.2", "https://3.3.3.3"], + ["https://1.1.1.1", "https://2.2.2.2", "https://3.3.3.3"], "www.qiniu.com", "a=b&c=d", APPLICATION_WWW_FORM_URLENCODED.as_ref(), @@ -990,7 +990,7 @@ mod tests { #[test] fn test_build_upload_policy_with_mime() -> Result<()> { let policy = UploadPolicyBuilder::new_policy_for_bucket("test_bucket", Duration::from_secs(3600)) - .mime_types(&["image/jpeg", "image/png"]) + .mime_types(["image/jpeg", "image/png"]) .build(); assert_eq!( policy.mime_types().map(|ops| ops.collect::>()), diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 3035119b..1d226b0e 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qiniu-utils" -version = "0.2.0" +version = "0.2.1" authors = ["Rong Zhou ", "Shanghai Qiniu Information Technologies Co., Ltd."] edition = "2021" rust-version = "1.60.0"