Skip to content

Commit

Permalink
handle markers internally in S3
Browse files Browse the repository at this point in the history
  • Loading branch information
meteorgan committed Sep 19, 2024
1 parent d1fe653 commit 4770211
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
9 changes: 0 additions & 9 deletions core/src/raw/oio/list/page_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ pub trait PageList: Send + Sync + Unpin + 'static {
///
/// - Set `done` to `true` if all page have been fetched.
/// - Update `token` if there is more page to fetch. `token` is not exposed to users, it's internal used only.
/// - Update `key_marker` and `version_id_marker` if object versioning is enabled and there are more page to fetch.
/// similar to `token`, they should only be internal used
/// - Push back into the entries for each entry fetched from underlying storage.
///
/// NOTE: `entries` is a `VecDeque` to avoid unnecessary memory allocation. Only `push_back` is allowed.
Expand All @@ -55,11 +53,6 @@ pub struct PageContext {
pub done: bool,
/// token is used by underlying storage services to fetch next page.
pub token: String,
/// key_marker and version_id_marker are used together by underlying storage services to fetch
/// next page when object versioning is enabled
pub key_marker: String,
/// version_id_marker is used with key_marker
pub version_id_marker: String,
/// entries are used to store entries fetched from underlying storage.
///
/// Please always reuse the same `VecDeque` to avoid unnecessary memory allocation.
Expand All @@ -85,8 +78,6 @@ where
ctx: PageContext {
done: false,
token: "".to_string(),
key_marker: "".to_string(),
version_id_marker: "".to_string(),
entries: VecDeque::new(),
},
}
Expand Down
27 changes: 19 additions & 8 deletions core/src/services/s3/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ pub struct S3ObjectVersionsLister {
delimiter: &'static str,
limit: Option<usize>,
start_after: String,
abs_start_after: String,
}

impl S3ObjectVersionsLister {
Expand All @@ -155,32 +156,39 @@ impl S3ObjectVersionsLister {
start_after: Option<&str>,
) -> Self {
let delimiter = if recursive { "" } else { "/" };
let start_after = start_after.unwrap_or_default().to_owned();
let abs_start_after = build_abs_path(core.root.as_str(), start_after.as_str());

Self {
core,
prefix: path.to_string(),
delimiter,
limit,
start_after: start_after.map_or("".to_owned(), String::from),
start_after,
abs_start_after,
}
}
}

impl oio::PageList for S3ObjectVersionsLister {
async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
let key_marker = if ctx.key_marker.is_empty() && !self.start_after.is_empty() {
build_abs_path(&self.core.root, &self.start_after)
let markers = ctx.token.rsplit_once(" ");
let (key_marker, version_id_marker) = if let Some(data) = markers {
data
} else if !self.start_after.is_empty() {
(self.abs_start_after.as_str(), "")
} else {
ctx.key_marker.clone()
("", "")
};

let resp = self
.core
.s3_list_object_versions(
&self.prefix,
self.delimiter,
self.limit,
&key_marker,
&ctx.version_id_marker,
key_marker,
version_id_marker,
)
.await?;
if resp.status() != http::StatusCode::OK {
Expand All @@ -196,8 +204,11 @@ impl oio::PageList for S3ObjectVersionsLister {
} else {
false
};
ctx.key_marker = output.next_key_marker.unwrap_or_default();
ctx.version_id_marker = output.next_version_id_marker.unwrap_or_default();
ctx.token = format!(
"{} {}",
output.next_key_marker.unwrap_or_default(),
output.next_version_id_marker.unwrap_or_default()
);

for prefix in output.common_prefixes {
let de = oio::Entry::new(
Expand Down

0 comments on commit 4770211

Please sign in to comment.