Skip to content

Commit

Permalink
feat: Implement path cache and refactor gdrive (#3975)
Browse files Browse the repository at this point in the history
* Implement path cache

Signed-off-by: Xuanwo <[email protected]>

* feat: Implement path cache and refactor gdrive

Signed-off-by: Xuanwo <[email protected]>

* Fix gdrive build

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

* Fix tests

Signed-off-by: Xuanwo <[email protected]>

* Fix list

Signed-off-by: Xuanwo <[email protected]>

* Fix gdrive

Signed-off-by: Xuanwo <[email protected]>

* Save work

Signed-off-by: Xuanwo <[email protected]>

* Refactor

Signed-off-by: Xuanwo <[email protected]>

* FIx path cache

Signed-off-by: Xuanwo <[email protected]>

* Update cache after write file

Signed-off-by: Xuanwo <[email protected]>

* Update cache with list result

Signed-off-by: Xuanwo <[email protected]>

* Introduce lock for path cache

Signed-off-by: Xuanwo <[email protected]>

* Fix ensure path

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

* ignore empty path

Signed-off-by: Xuanwo <[email protected]>

* Make sure ensure_path is guard by lock

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

* FIx deadlock

Signed-off-by: Xuanwo <[email protected]>

* Fix assert

Signed-off-by: Xuanwo <[email protected]>

* Fix path

Signed-off-by: Xuanwo <[email protected]>

* Fix rename

Signed-off-by: Xuanwo <[email protected]>

* Fix typo

Signed-off-by: Xuanwo <[email protected]>

* Polish

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Jan 16, 2024
1 parent 4c11ed4 commit f6ddb8e
Show file tree
Hide file tree
Showing 11 changed files with 551 additions and 492 deletions.
6 changes: 5 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ native-tls = ["reqwest/native-tls"]
# Enable vendored native-tls for TLS support
native-tls-vendored = ["reqwest/native-tls-vendored"]

# Enable path cache.
# This is an internal feature, and should not be used by users.
internal-path-cache = ["dep:moka"]

# Enable all layers.
layers-all = [
"layers-chaos",
Expand Down Expand Up @@ -142,7 +146,7 @@ services-gcs = [
"reqsign?/services-google",
"reqsign?/reqwest_request",
]
services-gdrive = []
services-gdrive = ["internal-path-cache"]
services-ghac = []
services-gridfs = ["dep:mongodb"]
services-hdfs = ["dep:hdrs"]
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl HttpClient {
err.is_builder() ||
// Error returned by RedirectPolicy.
//
// We don't set this by hand, just don't allow retry.
// Don't retry error if we redirect too many.
err.is_redirect() ||
// We never use `Response::error_for_status`, just don't allow retry.
//
Expand Down
5 changes: 5 additions & 0 deletions core/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ pub use layer::*;
mod path;
pub use path::*;

#[cfg(feature = "internal-path-cache")]
mod path_cache;
#[cfg(feature = "internal-path-cache")]
pub use path_cache::*;

mod operation;
pub use operation::*;

Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::EntryMode;
use crate::*;

/// build_abs_path will build an absolute path with root.
///
Expand Down
235 changes: 235 additions & 0 deletions core/src/raw/path_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::raw::*;
use crate::*;
use async_trait::async_trait;
use moka::sync::Cache;
use std::collections::VecDeque;
use tokio::sync::{Mutex, MutexGuard};

/// The trait required for path cacher.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait PathQuery {
/// Fetch the id for the root of the service.
async fn root(&self) -> Result<String>;
/// Query the id by parent_id and name.
async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>>;
/// Create a dir by parent_id and name.
async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String>;
}

/// PathCacher is a cache for path query.
///
/// OpenDAL is designed for path based storage systems, such as S3, HDFS, etc. But there are many
/// services that are not path based, such as OneDrive, Google Drive, etc. For these services, we
/// lookup files based on id. The lookup of id is very expensive, so we cache the path to id mapping
/// in PathCacher.
///
/// # Behavior
///
/// The `path` in the cache is always an absolute one. For example, if the service root is `/root/`,
/// then the path of file `a/b` in cache will be `/root/a/b`.
pub struct PathCacher<Q: PathQuery> {
query: Q,
cache: Cache<String, String>,

/// This optional lock here is used to prevent concurrent insertions of the same path.
///
/// Some services like gdrive allows the same name to exist in the same directory. We need to introduce
/// a global lock to prevent concurrent insertions of the same path.
lock: Option<Mutex<()>>,
}

impl<Q: PathQuery> PathCacher<Q> {
/// Create a new path cacher.
pub fn new(query: Q) -> Self {
Self {
query,
cache: Cache::new(64 * 1024),
lock: None,
}
}

/// Enable the lock for the path cacher.
pub fn with_lock(mut self) -> Self {
self.lock = Some(Mutex::default());
self
}

async fn lock(&self) -> Option<MutexGuard<()>> {
if let Some(l) = &self.lock {
Some(l.lock().await)
} else {
None
}
}

/// Insert a new cache entry.
pub async fn insert(&self, path: &str, id: &str) {
let _guard = self.lock().await;

// This should never happen, but let's ignore the insert if happened.
if self.cache.contains_key(path) {
debug_assert!(
self.cache.get(path) == Some(id.to_string()),
"path {path} exists but it's value is inconsistent"
);
return;
}

self.cache.insert(path.to_string(), id.to_string());
}

/// Remove a cache entry.
pub async fn remove(&self, path: &str) {
let _guard = self.lock().await;

self.cache.invalidate(path)
}

/// Get the id for the given path.
pub async fn get(&self, path: &str) -> Result<Option<String>> {
let _guard = self.lock().await;

if let Some(id) = self.cache.get(path) {
return Ok(Some(id));
}

let mut paths = VecDeque::new();
let mut current_path = path;

while current_path != "/" && !current_path.is_empty() {
paths.push_front(current_path.to_string());
current_path = get_parent(current_path);
if let Some(id) = self.cache.get(current_path) {
return self.query_down(&id, paths).await;
}
}

let root_id = self.query.root().await?;
self.cache.insert("/".to_string(), root_id.clone());
self.query_down(&root_id, paths).await
}

/// `start_id` is the `file_id` to the start dir to query down.
/// `paths` is in the order like `["/a/", "/a/b/", "/a/b/c/"]`.
///
/// We should fetch the next `file_id` by sending `query`.
async fn query_down(&self, start_id: &str, paths: VecDeque<String>) -> Result<Option<String>> {
let mut current_id = start_id.to_string();
for path in paths.into_iter() {
let name = get_basename(&path);
current_id = match self.query.query(&current_id, name).await? {
Some(id) => {
self.cache.insert(path, id.clone());
id
}
None => return Ok(None),
};
}
Ok(Some(current_id))
}

/// Ensure input dir exists.
pub async fn ensure_dir(&self, path: &str) -> Result<String> {
let _guard = self.lock().await;

let mut tmp = "".to_string();
// All parents that need to check.
let mut parents = vec![];
for component in path.split('/') {
if component.is_empty() {
continue;
}

tmp.push_str(component);
tmp.push('/');
parents.push(tmp.to_string());
}

let mut parent_id = match self.cache.get("/") {
Some(v) => v,
None => self.query.root().await?,
};
for parent in parents {
parent_id = match self.cache.get(&parent) {
Some(value) => value,
None => {
let value = match self.query.query(&parent_id, get_basename(&parent)).await? {
Some(value) => value,
None => {
self.query
.create_dir(&parent_id, get_basename(&parent))
.await?
}
};
self.cache.insert(parent, value.clone());
value
}
}
}

Ok(parent_id)
}
}

#[cfg(test)]
mod tests {
use crate::raw::{PathCacher, PathQuery};
use crate::*;
use async_trait::async_trait;

struct TestQuery {}

#[async_trait]
impl PathQuery for TestQuery {
async fn root(&self) -> Result<String> {
Ok("root/".to_string())
}

async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
if name.starts_with("not_exist") {
return Ok(None);
}
Ok(Some(format!("{parent_id}{name}")))
}

async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String> {
Ok(format!("{parent_id}{name}"))
}
}

#[tokio::test]
async fn test_path_cacher_get() {
let cases = vec![
("root", "/", Some("root/")),
("normal path", "/a", Some("root/a")),
("not exist normal dir", "/not_exist/a", None),
("not exist normal file", "/a/b/not_exist", None),
("nest path", "/a/b/c/d", Some("root/a/b/c/d")),
];

for (name, input, expect) in cases {
let cache = PathCacher::new(TestQuery {});

let actual = cache.get(input).await.unwrap();
assert_eq!(actual.as_deref(), expect, "{}", name)
}
}
}
Loading

0 comments on commit f6ddb8e

Please sign in to comment.