Skip to content

Commit

Permalink
feat: Implement RFC-2774 Lister API (#2787)
Browse files Browse the repository at this point in the history
* Save work

Signed-off-by: Xuanwo <[email protected]>

* Save work

Signed-off-by: Xuanwo <[email protected]>

* Save work

Signed-off-by: Xuanwo <[email protected]>

* remove unexpected change

Signed-off-by: Xuanwo <[email protected]>

* polish

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

* Polish docs

Signed-off-by: Xuanwo <[email protected]>

* Update rfc

Signed-off-by: Xuanwo <[email protected]>

* Fix

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 6, 2023
1 parent b0f4d1b commit 338a8ae
Show file tree
Hide file tree
Showing 17 changed files with 221 additions and 87 deletions.
2 changes: 1 addition & 1 deletion bin/oay/src/services/s3/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn handle_list_objects(

let mut lister = state
.op
.list_with(&params.prefix)
.lister_with(&params.prefix)
.start_after(&params.start_after)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion bin/oay/src/services/webdav/webdavfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl DavFileSystem for WebdavFs {
) -> dav_server::fs::FsFuture<dav_server::fs::FsStream<Box<dyn dav_server::fs::DavDirEntry>>>
{
async move {
let lister = self.op.list(path.as_url_string().as_str()).await.unwrap();
let lister = self.op.lister(path.as_url_string().as_str()).await.unwrap();
Ok(DavStream::new(self.op.clone(), lister).boxed())
}
.boxed()
Expand Down
2 changes: 1 addition & 1 deletion bin/oli/src/commands/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
}

let dst_root = Path::new(&dst_path);
let mut ds = src_op.scan(&src_path).await?;
let mut ds = src_op.lister_with(&src_path).delimiter("").await?;
while let Some(de) = ds.try_next().await? {
let meta = src_op.metadata(&de, Metakey::Mode).await?;
if meta.mode().is_dir() {
Expand Down
4 changes: 2 additions & 2 deletions bin/oli/src/commands/ls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
let (op, path) = cfg.parse_location(target)?;

if !recursive {
let mut ds = op.list(&path).await?;
let mut ds = op.lister(&path).await?;
while let Some(de) = ds.try_next().await? {
println!("{}", de.name());
}
return Ok(());
}

let mut ds = op.scan(&path).await?;
let mut ds = op.lister_with(&path).delimiter("").await?;
while let Some(de) = ds.try_next().await? {
println!("{}", de.path());
}
Expand Down
1 change: 1 addition & 0 deletions bindings/nodejs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ export class Operator {
* An error will be returned if given path doesn't end with /.
*
* ### Example
*
* ```javascript
* const lister = await op.scan("/path/to/dir/");
* while (true) {
Expand Down
13 changes: 11 additions & 2 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ impl Operator {
/// An error will be returned if given path doesn't end with /.
///
/// ### Example
///
/// ```javascript
/// const lister = await op.scan("/path/to/dir/");
/// while (true) {
Expand All @@ -303,7 +304,13 @@ impl Operator {
/// `````
#[napi]
pub async fn scan(&self, path: String) -> Result<Lister> {
Ok(Lister(self.0.scan(&path).await.map_err(format_napi_error)?))
Ok(Lister(
self.0
.lister_with(&path)
.delimiter("")
.await
.map_err(format_napi_error)?,
))
}

/// List dir in flat way synchronously.
Expand Down Expand Up @@ -408,7 +415,9 @@ impl Operator {
/// ```
#[napi]
pub async fn list(&self, path: String) -> Result<Lister> {
Ok(Lister(self.0.list(&path).await.map_err(format_napi_error)?))
Ok(Lister(
self.0.lister(&path).await.map_err(format_napi_error)?,
))
}

/// List given path synchronously.
Expand Down
5 changes: 3 additions & 2 deletions bindings/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ impl ObjectStore for OpendalStore {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let stream = self
.inner
.scan(&path)
.lister_with(&path)
.delimiter("")
.await
.map_err(|err| format_object_store_error(err, &path))?;

Expand All @@ -170,7 +171,7 @@ impl ObjectStore for OpendalStore {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let mut stream = self
.inner
.list(&path)
.lister(&path)
.await
.map_err(|err| format_object_store_error(err, &path))?;

Expand Down
8 changes: 6 additions & 2 deletions bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl AsyncOperator {
pub fn list<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let lister = this.list(&path).await.map_err(format_pyerr)?;
let lister = this.lister(&path).await.map_err(format_pyerr)?;
let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py));
Ok(pylister)
})
Expand All @@ -149,7 +149,11 @@ impl AsyncOperator {
pub fn scan<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let lister = this.scan(&path).await.map_err(format_pyerr)?;
let lister = this
.lister_with(&path)
.delimiter("")
.await
.map_err(format_pyerr)?;
let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py));
Ok(pylister)
})
Expand Down
1 change: 1 addition & 0 deletions core/src/docs/rfcs/2774_lister_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ We will:

- Rename existing `list` to `lister`
- Add new `list` method to call `lister` and return all entries
- Merge `scan` into `list_with` with `delimiter("")`

This keeps the pagination logic encapsulated in `lister`.

Expand Down
12 changes: 12 additions & 0 deletions core/src/docs/upgrade.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# Unreleased

## Public API

### RFC-2774 Lister API

RFC-2774 proposes a new `lister` API to replace current `list` and `scan`. And we add a new API `list` to return entries directly.

- For listing a directory at once, please use `list()` for convenience.
- For listing a directory recursively, please use `list_with().delimiter("")` or `lister_with().delimiter("")` instead of `scan()`.
- For listing in streaming, please use `lister()` or `lister_with()` instead.

# Upgrade to v0.39

## Public API
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ mod tests {
capability_test!(rename, |op| {
op.rename("/path/to/mock_file", "/path/to/mock_file_2")
});
capability_test!(list, |op| { op.list("/path/to/mock_dir/") });
capability_test!(list, |op| { op.lister("/path/to/mock_dir/") });
capability_test!(presign, |op| {
op.presign_read("/path/to/mock_file", Duration::from_secs(1))
});
Expand Down
10 changes: 5 additions & 5 deletions core/src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ mod tests {

let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.list("").await?;
let mut ds = op.lister("").await?;
while let Some(entry) = ds.try_next().await? {
debug!("got entry: {}", entry.path());
assert!(
Expand Down Expand Up @@ -341,7 +341,7 @@ mod tests {
.layer(iil)
.finish();

let mut ds = op.scan("/").await?;
let mut ds = op.lister_with("/").delimiter("").await?;
let mut set = HashSet::new();
let mut map = HashMap::new();
while let Some(entry) = ds.try_next().await? {
Expand Down Expand Up @@ -391,7 +391,7 @@ mod tests {
// List /
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.list("/").await?;
let mut ds = op.lister("/").await?;
while let Some(entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
Expand All @@ -410,7 +410,7 @@ mod tests {
// List dataset/stateful/
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.list("dataset/stateful/").await?;
let mut ds = op.lister("dataset/stateful/").await?;
while let Some(entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
Expand Down Expand Up @@ -452,7 +452,7 @@ mod tests {
.layer(iil)
.finish();

let mut ds = op.scan("/").await?;
let mut ds = op.lister_with("/").delimiter("").await?;

let mut map = HashMap::new();
let mut set = HashSet::new();
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ mod tests {
let expected = vec!["hello", "world", "2023/", "0208/"];

let mut lister = op
.list("retryable_error/")
.lister("retryable_error/")
.await
.expect("service must support list");
let mut actual = Vec::new();
Expand Down
Loading

0 comments on commit 338a8ae

Please sign in to comment.