diff --git a/core/src/raw/oio/list/page_list.rs b/core/src/raw/oio/list/page_list.rs index bc4305224ae..897a54f4939 100644 --- a/core/src/raw/oio/list/page_list.rs +++ b/core/src/raw/oio/list/page_list.rs @@ -45,8 +45,6 @@ pub trait PageList: Send + Sync + Unpin + 'static { /// /// - Set `done` to `true` if all page have been fetched. /// - Update `token` if there is more page to fetch. `token` is not exposed to users, it's internal used only. -/// - Update `key_marker` and `version_id_marker` if object versioning is enabled and there are more page to fetch. -/// similar to `token`, they should only be internal used /// - Push back into the entries for each entry fetched from underlying storage. /// /// NOTE: `entries` is a `VecDeque` to avoid unnecessary memory allocation. Only `push_back` is allowed. @@ -55,11 +53,6 @@ pub struct PageContext { pub done: bool, /// token is used by underlying storage services to fetch next page. pub token: String, - /// key_marker and version_id_marker are used together by underlying storage services to fetch - /// next page when object versioning is enabled - pub key_marker: String, - /// version_id_marker is used with key_marker - pub version_id_marker: String, /// entries are used to store entries fetched from underlying storage. /// /// Please always reuse the same `VecDeque` to avoid unnecessary memory allocation. @@ -85,8 +78,6 @@ where ctx: PageContext { done: false, token: "".to_string(), - key_marker: "".to_string(), - version_id_marker: "".to_string(), entries: VecDeque::new(), }, } diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs index 4a9f067df09..621d82c2243 100644 --- a/core/src/services/s3/lister.rs +++ b/core/src/services/s3/lister.rs @@ -144,6 +144,7 @@ pub struct S3ObjectVersionsLister { delimiter: &'static str, limit: Option, start_after: String, + abs_start_after: String, } impl S3ObjectVersionsLister { @@ -155,32 +156,39 @@ impl S3ObjectVersionsLister { start_after: Option<&str>, ) -> Self { let delimiter = if recursive { "" } else { "/" }; + let start_after = start_after.unwrap_or_default().to_owned(); + let abs_start_after = build_abs_path(core.root.as_str(), start_after.as_str()); Self { core, prefix: path.to_string(), delimiter, limit, - start_after: start_after.map_or("".to_owned(), String::from), + start_after, + abs_start_after, } } } impl oio::PageList for S3ObjectVersionsLister { async fn next_page(&self, ctx: &mut PageContext) -> Result<()> { - let key_marker = if ctx.key_marker.is_empty() && !self.start_after.is_empty() { - build_abs_path(&self.core.root, &self.start_after) + let markers = ctx.token.rsplit_once(" "); + let (key_marker, version_id_marker) = if let Some(data) = markers { + data + } else if !self.start_after.is_empty() { + (self.abs_start_after.as_str(), "") } else { - ctx.key_marker.clone() + ("", "") }; + let resp = self .core .s3_list_object_versions( &self.prefix, self.delimiter, self.limit, - &key_marker, - &ctx.version_id_marker, + key_marker, + version_id_marker, ) .await?; if resp.status() != http::StatusCode::OK { @@ -196,8 +204,11 @@ impl oio::PageList for S3ObjectVersionsLister { } else { false }; - ctx.key_marker = output.next_key_marker.unwrap_or_default(); - ctx.version_id_marker = output.next_version_id_marker.unwrap_or_default(); + ctx.token = format!( + "{} {}", + output.next_key_marker.unwrap_or_default(), + output.next_version_id_marker.unwrap_or_default() + ); for prefix in output.common_prefixes { let de = oio::Entry::new(