From f6ddb8e40d47c8eb17c04e92c289b2019350d700 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 16 Jan 2024 22:55:03 +0800 Subject: [PATCH] feat: Implement path cache and refactor gdrive (#3975) * Implement path cache Signed-off-by: Xuanwo * feat: Implement path cache and refactor gdrive Signed-off-by: Xuanwo * Fix gdrive build Signed-off-by: Xuanwo * Fix build Signed-off-by: Xuanwo * Fix tests Signed-off-by: Xuanwo * Fix list Signed-off-by: Xuanwo * Fix gdrive Signed-off-by: Xuanwo * Save work Signed-off-by: Xuanwo * Refactor Signed-off-by: Xuanwo * FIx path cache Signed-off-by: Xuanwo * Update cache after write file Signed-off-by: Xuanwo * Update cache with list result Signed-off-by: Xuanwo * Introduce lock for path cache Signed-off-by: Xuanwo * Fix ensure path Signed-off-by: Xuanwo * Fix build Signed-off-by: Xuanwo * Fix build Signed-off-by: Xuanwo * ignore empty path Signed-off-by: Xuanwo * Make sure ensure_path is guard by lock Signed-off-by: Xuanwo * Fix build Signed-off-by: Xuanwo * FIx deadlock Signed-off-by: Xuanwo * Fix assert Signed-off-by: Xuanwo * Fix path Signed-off-by: Xuanwo * Fix rename Signed-off-by: Xuanwo * Fix typo Signed-off-by: Xuanwo * Polish Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- core/Cargo.toml | 6 +- core/src/raw/http_util/client.rs | 2 +- core/src/raw/mod.rs | 5 + core/src/raw/path.rs | 2 +- core/src/raw/path_cache.rs | 235 +++++++++++++++ core/src/services/gdrive/backend.rs | 228 ++++++--------- core/src/services/gdrive/builder.rs | 33 ++- core/src/services/gdrive/core.rs | 430 ++++++++++------------------ core/src/services/gdrive/lister.rs | 12 +- core/src/services/gdrive/writer.rs | 74 ++--- core/tests/behavior/async_list.rs | 16 +- 11 files changed, 551 insertions(+), 492 deletions(-) create mode 100644 core/src/raw/path_cache.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 5fbe2a7369b..87c3b32d2f6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -72,6 +72,10 @@ native-tls = ["reqwest/native-tls"] # Enable vendored native-tls for TLS support native-tls-vendored = ["reqwest/native-tls-vendored"] +# Enable path cache. +# This is an internal feature, and should not be used by users. +internal-path-cache = ["dep:moka"] + # Enable all layers. layers-all = [ "layers-chaos", @@ -142,7 +146,7 @@ services-gcs = [ "reqsign?/services-google", "reqsign?/reqwest_request", ] -services-gdrive = [] +services-gdrive = ["internal-path-cache"] services-ghac = [] services-gridfs = ["dep:mongodb"] services-hdfs = ["dep:hdrs"] diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index ea1e9ea0b8b..f83c1884098 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -148,7 +148,7 @@ impl HttpClient { err.is_builder() || // Error returned by RedirectPolicy. // - // We don't set this by hand, just don't allow retry. + // Don't retry error if we redirect too many. err.is_redirect() || // We never use `Response::error_for_status`, just don't allow retry. // diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index ee8feac2608..421f56a0daf 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -35,6 +35,11 @@ pub use layer::*; mod path; pub use path::*; +#[cfg(feature = "internal-path-cache")] +mod path_cache; +#[cfg(feature = "internal-path-cache")] +pub use path_cache::*; + mod operation; pub use operation::*; diff --git a/core/src/raw/path.rs b/core/src/raw/path.rs index cdc88d0320d..db4d047616c 100644 --- a/core/src/raw/path.rs +++ b/core/src/raw/path.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::EntryMode; +use crate::*; /// build_abs_path will build an absolute path with root. /// diff --git a/core/src/raw/path_cache.rs b/core/src/raw/path_cache.rs new file mode 100644 index 00000000000..45efbf86917 --- /dev/null +++ b/core/src/raw/path_cache.rs @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::*; +use crate::*; +use async_trait::async_trait; +use moka::sync::Cache; +use std::collections::VecDeque; +use tokio::sync::{Mutex, MutexGuard}; + +/// The trait required for path cacher. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait PathQuery { + /// Fetch the id for the root of the service. + async fn root(&self) -> Result; + /// Query the id by parent_id and name. + async fn query(&self, parent_id: &str, name: &str) -> Result>; + /// Create a dir by parent_id and name. + async fn create_dir(&self, parent_id: &str, name: &str) -> Result; +} + +/// PathCacher is a cache for path query. +/// +/// OpenDAL is designed for path based storage systems, such as S3, HDFS, etc. But there are many +/// services that are not path based, such as OneDrive, Google Drive, etc. For these services, we +/// lookup files based on id. The lookup of id is very expensive, so we cache the path to id mapping +/// in PathCacher. +/// +/// # Behavior +/// +/// The `path` in the cache is always an absolute one. For example, if the service root is `/root/`, +/// then the path of file `a/b` in cache will be `/root/a/b`. +pub struct PathCacher { + query: Q, + cache: Cache, + + /// This optional lock here is used to prevent concurrent insertions of the same path. + /// + /// Some services like gdrive allows the same name to exist in the same directory. We need to introduce + /// a global lock to prevent concurrent insertions of the same path. + lock: Option>, +} + +impl PathCacher { + /// Create a new path cacher. + pub fn new(query: Q) -> Self { + Self { + query, + cache: Cache::new(64 * 1024), + lock: None, + } + } + + /// Enable the lock for the path cacher. + pub fn with_lock(mut self) -> Self { + self.lock = Some(Mutex::default()); + self + } + + async fn lock(&self) -> Option> { + if let Some(l) = &self.lock { + Some(l.lock().await) + } else { + None + } + } + + /// Insert a new cache entry. + pub async fn insert(&self, path: &str, id: &str) { + let _guard = self.lock().await; + + // This should never happen, but let's ignore the insert if happened. + if self.cache.contains_key(path) { + debug_assert!( + self.cache.get(path) == Some(id.to_string()), + "path {path} exists but it's value is inconsistent" + ); + return; + } + + self.cache.insert(path.to_string(), id.to_string()); + } + + /// Remove a cache entry. + pub async fn remove(&self, path: &str) { + let _guard = self.lock().await; + + self.cache.invalidate(path) + } + + /// Get the id for the given path. + pub async fn get(&self, path: &str) -> Result> { + let _guard = self.lock().await; + + if let Some(id) = self.cache.get(path) { + return Ok(Some(id)); + } + + let mut paths = VecDeque::new(); + let mut current_path = path; + + while current_path != "/" && !current_path.is_empty() { + paths.push_front(current_path.to_string()); + current_path = get_parent(current_path); + if let Some(id) = self.cache.get(current_path) { + return self.query_down(&id, paths).await; + } + } + + let root_id = self.query.root().await?; + self.cache.insert("/".to_string(), root_id.clone()); + self.query_down(&root_id, paths).await + } + + /// `start_id` is the `file_id` to the start dir to query down. + /// `paths` is in the order like `["/a/", "/a/b/", "/a/b/c/"]`. + /// + /// We should fetch the next `file_id` by sending `query`. + async fn query_down(&self, start_id: &str, paths: VecDeque) -> Result> { + let mut current_id = start_id.to_string(); + for path in paths.into_iter() { + let name = get_basename(&path); + current_id = match self.query.query(¤t_id, name).await? { + Some(id) => { + self.cache.insert(path, id.clone()); + id + } + None => return Ok(None), + }; + } + Ok(Some(current_id)) + } + + /// Ensure input dir exists. + pub async fn ensure_dir(&self, path: &str) -> Result { + let _guard = self.lock().await; + + let mut tmp = "".to_string(); + // All parents that need to check. + let mut parents = vec![]; + for component in path.split('/') { + if component.is_empty() { + continue; + } + + tmp.push_str(component); + tmp.push('/'); + parents.push(tmp.to_string()); + } + + let mut parent_id = match self.cache.get("/") { + Some(v) => v, + None => self.query.root().await?, + }; + for parent in parents { + parent_id = match self.cache.get(&parent) { + Some(value) => value, + None => { + let value = match self.query.query(&parent_id, get_basename(&parent)).await? { + Some(value) => value, + None => { + self.query + .create_dir(&parent_id, get_basename(&parent)) + .await? + } + }; + self.cache.insert(parent, value.clone()); + value + } + } + } + + Ok(parent_id) + } +} + +#[cfg(test)] +mod tests { + use crate::raw::{PathCacher, PathQuery}; + use crate::*; + use async_trait::async_trait; + + struct TestQuery {} + + #[async_trait] + impl PathQuery for TestQuery { + async fn root(&self) -> Result { + Ok("root/".to_string()) + } + + async fn query(&self, parent_id: &str, name: &str) -> Result> { + if name.starts_with("not_exist") { + return Ok(None); + } + Ok(Some(format!("{parent_id}{name}"))) + } + + async fn create_dir(&self, parent_id: &str, name: &str) -> Result { + Ok(format!("{parent_id}{name}")) + } + } + + #[tokio::test] + async fn test_path_cacher_get() { + let cases = vec![ + ("root", "/", Some("root/")), + ("normal path", "/a", Some("root/a")), + ("not exist normal dir", "/not_exist/a", None), + ("not exist normal file", "/a/b/not_exist", None), + ("nest path", "/a/b/c/d", Some("root/a/b/c/d")), + ]; + + for (name, input, expect) in cases { + let cache = PathCacher::new(TestQuery {}); + + let actual = cache.get(input).await.unwrap(); + assert_eq!(actual.as_deref(), expect, "{}", name) + } + } +} diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 50a2531a299..ed9ed8354ea 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -63,13 +63,9 @@ impl Accessor for GdriveBackend { write: true, create_dir: true, - - rename: true, - delete: true, - + rename: true, copy: true, - ..Default::default() }); @@ -77,22 +73,8 @@ impl Accessor for GdriveBackend { } async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result { - let parent = self.core.ensure_parent_path(path).await?; - - // Make sure `/` has been trimmed. - let path = get_basename(path).trim_end_matches('/'); - - // As Google Drive allows files have the same name, we need to check if the folder exists. - let folder_id = self.core.gdrive_search_folder(&parent, path).await?; - - let id = if let Some(id) = folder_id { - id - } else { - self.core.gdrive_create_folder(&parent, path).await? - }; - - let mut cache = self.core.path_cache.lock().await; - cache.insert(build_abs_path(&self.core.root, path), id); + let path = build_abs_path(&self.core.root, path); + let _ = self.core.path_cache.ensure_dir(&path).await?; Ok(RpCreateDir::default()) } @@ -104,7 +86,25 @@ impl Accessor for GdriveBackend { return Err(parse_error(resp).await?); } - let meta = self.parse_metadata(resp.into_body().bytes().await?)?; + let bs = resp.into_body().bytes().await?; + let gdrive_file: GdriveFile = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + + if gdrive_file.mime_type == "application/vnd.google-apps.folder" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + }; + + let mut meta = Metadata::new(EntryMode::FILE); + if let Some(v) = gdrive_file.size { + meta = meta.with_content_length(v.parse::().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) + })?); + } + if let Some(v) = gdrive_file.modified_time { + meta = meta.with_last_modified(v.parse::>().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) + })?); + } Ok(RpStat::new(meta)) } @@ -127,108 +127,70 @@ impl Accessor for GdriveBackend { } async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let path = build_abs_path(&self.core.root, path); + // As Google Drive allows files have the same name, we need to check if the file exists. // If the file exists, we will keep its ID and update it. - let mut file_id: Option = None; - - let resp = self.core.gdrive_stat(path).await; - // We don't care about the error here. - // As long as the file doesn't exist, we will create a new one. - if let Ok(resp) = resp { - let status = resp.status(); - - if status == StatusCode::OK { - let body = resp.into_body().bytes().await?; - let meta = serde_json::from_slice::(&body) - .map_err(new_json_deserialize_error)?; - - file_id = if meta.id.is_empty() { - None - } else { - Some(meta.id) - }; - } - } + let file_id = self.core.path_cache.get(&path).await?; Ok(( RpWrite::default(), - oio::OneShotWriter::new(GdriveWriter::new( - self.core.clone(), - String::from(path), - file_id, - )), + oio::OneShotWriter::new(GdriveWriter::new(self.core.clone(), path, file_id)), )) } async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.core.gdrive_delete(path).await; - if let Ok(resp) = resp { - let status = resp.status(); - - match status { - StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => { - let mut cache = self.core.path_cache.lock().await; - - cache.remove(&build_abs_path(&self.core.root, path)); - - return Ok(RpDelete::default()); - } - _ => return Err(parse_error(resp).await?), - } + let path = build_abs_path(&self.core.root, path); + let file_id = self.core.path_cache.get(&path).await?; + let file_id = if let Some(id) = file_id { + id + } else { + return Ok(RpDelete::default()); }; - let e = resp.err().unwrap(); - if e.kind() == ErrorKind::NotFound { - Ok(RpDelete::default()) - } else { - Err(e) + let resp = self.core.gdrive_delete(&file_id).await?; + let status = resp.status(); + if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND { + return Err(parse_error(resp).await?); } + + self.core.path_cache.remove(&path).await; + resp.into_body().consume().await?; + return Ok(RpDelete::default()); } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { - let l = GdriveLister::new(path.into(), self.core.clone()); + let path = build_abs_path(&self.core.root, path); + let l = GdriveLister::new(path, self.core.clone()); Ok((RpList::default(), oio::PageLister::new(l))) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { - let from_file_id = self - .core - .get_file_id_by_path(from) - .await? - .ok_or(Error::new(ErrorKind::NotFound, "invalid 'from' path"))?; + let from = build_abs_path(&self.core.root, from); - // split `to` into parent and name according to the last `/` - let mut to_path_items: Vec<&str> = to.split('/').filter(|&x| !x.is_empty()).collect(); + let from_file_id = self.core.path_cache.get(&from).await?.ok_or(Error::new( + ErrorKind::NotFound, + "the file to copy does not exist", + ))?; - let to_name = if let Some(name) = to_path_items.pop() { - name - } else { - return Err(Error::new(ErrorKind::InvalidInput, "invalid 'to' path")); - }; - - let to_parent = to_path_items.join("/") + "/"; - - let to_parent_id = - if let Some(id) = self.core.get_file_id_by_path(to_parent.as_str()).await? { - id - } else { - self.create_dir(&to_parent, OpCreateDir::new()).await?; - self.core - .get_file_id_by_path(to_parent.as_str()) - .await? - .ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "create to's parent folder failed") - })? - }; + let to_name = get_basename(to); + let to_path = build_abs_path(&self.core.root, to); + let to_parent_id = self + .core + .path_cache + .ensure_dir(get_parent(&to_path)) + .await?; // copy will overwrite `to`, delete it if exist - if self - .core - .get_file_id_by_path(to) - .await - .is_ok_and(|id| id.is_some()) - { - self.delete(to, OpDelete::new()).await?; + if let Some(id) = self.core.path_cache.get(&to_path).await? { + let resp = self.core.gdrive_delete(&id).await?; + let status = resp.status(); + if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND { + return Err(parse_error(resp).await?); + } + + self.core.path_cache.remove(&to_path).await; + resp.into_body().consume().await?; } let url = format!( @@ -256,7 +218,25 @@ impl Accessor for GdriveBackend { } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { - let resp = self.core.gdrive_patch_metadata_request(from, to).await?; + let source = build_abs_path(&self.core.root, from); + let target = build_abs_path(&self.core.root, to); + + // rename will overwrite `to`, delete it if exist + if let Some(id) = self.core.path_cache.get(&target).await? { + let resp = self.core.gdrive_delete(&id).await?; + let status = resp.status(); + if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND { + return Err(parse_error(resp).await?); + } + + self.core.path_cache.remove(&target).await; + resp.into_body().consume().await?; + } + + let resp = self + .core + .gdrive_patch_metadata_request(&source, &target) + .await?; let status = resp.status(); @@ -266,10 +246,12 @@ impl Accessor for GdriveBackend { let meta = serde_json::from_slice::(&body) .map_err(new_json_deserialize_error)?; - let mut cache = self.core.path_cache.lock().await; + let cache = &self.core.path_cache; - cache.remove(&build_abs_path(&self.core.root, from)); - cache.insert(build_abs_path(&self.core.root, to), meta.id.clone()); + cache.remove(&build_abs_path(&self.core.root, from)).await; + cache + .insert(&build_abs_path(&self.core.root, to), &meta.id) + .await; Ok(RpRename::default()) } @@ -277,39 +259,3 @@ impl Accessor for GdriveBackend { } } } - -impl GdriveBackend { - pub(crate) fn parse_metadata(&self, body: Bytes) -> Result { - let metadata = - serde_json::from_slice::(&body).map_err(new_json_deserialize_error)?; - - let mut meta = Metadata::new(match metadata.mime_type.as_str() { - "application/vnd.google-apps.folder" => EntryMode::DIR, - _ => EntryMode::FILE, - }); - - let size = if meta.mode() == EntryMode::DIR { - // Google Drive does not return the size for folders. - 0 - } else { - metadata - .size - .expect("file size must exist") - .parse::() - .map_err(|e| { - Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) - })? - }; - meta = meta.with_content_length(size); - meta = meta.with_last_modified( - metadata - .modified_time - .expect("modified time must exist. please check your query param - fields") - .parse::>() - .map_err(|e| { - Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) - })?, - ); - Ok(meta) - } -} diff --git a/core/src/services/gdrive/builder.rs b/core/src/services/gdrive/builder.rs index 984c2790901..f3ca9b1bc49 100644 --- a/core/src/services/gdrive/builder.rs +++ b/core/src/services/gdrive/builder.rs @@ -26,10 +26,10 @@ use log::debug; use tokio::sync::Mutex; use super::backend::GdriveBackend; -use crate::raw::normalize_root; use crate::raw::HttpClient; -use crate::services::gdrive::core::GdriveCore; +use crate::raw::{normalize_root, PathCacher}; use crate::services::gdrive::core::GdriveSigner; +use crate::services::gdrive::core::{GdriveCore, GdrivePathQuery}; use crate::Scheme; use crate::*; @@ -144,13 +144,13 @@ impl Builder for GdriveBuilder { })? }; - let signer = match (self.access_token.take(), self.refresh_token.take()) { - (Some(access_token), None) => GdriveSigner { - access_token, + let mut signer = GdriveSigner::new(client.clone()); + match (self.access_token.take(), self.refresh_token.take()) { + (Some(access_token), None) => { + signer.access_token = access_token; // We will never expire user specified access token. - expires_in: DateTime::::MAX_UTC, - ..Default::default() - }, + signer.expires_in = DateTime::::MAX_UTC; + } (None, Some(refresh_token)) => { let client_id = self.client_id.take().ok_or_else(|| { Error::new( @@ -167,12 +167,10 @@ impl Builder for GdriveBuilder { .with_context("service", Scheme::Gdrive) })?; - GdriveSigner { - refresh_token, - client_id, - client_secret, - ..Default::default() - } + signer.refresh_token = refresh_token; + signer.client = client.clone(); + signer.client_id = client_id; + signer.client_secret = client_secret; } (Some(_), Some(_)) => { return Err(Error::new( @@ -190,12 +188,13 @@ impl Builder for GdriveBuilder { } }; + let signer = Arc::new(Mutex::new(signer)); Ok(GdriveBackend { core: Arc::new(GdriveCore { root, - signer: Arc::new(Mutex::new(signer)), - client, - path_cache: Arc::default(), + signer: signer.clone(), + client: client.clone(), + path_cache: PathCacher::new(GdrivePathQuery::new(client, signer)).with_lock(), }), }) } diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index f43e80acd3a..73685b86320 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use async_trait::async_trait; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -46,15 +46,7 @@ pub struct GdriveCore { pub signer: Arc>, /// Cache the mapping from path to file id - /// - /// Google Drive uses file id to identify a file. - /// As the path is immutable, we can cache the mapping from path to file id. - /// - /// # Notes - /// - /// - The path is rooted at the root of the Google Drive. - /// - The path is absolute path, like `foo/bar`. - pub path_cache: Arc>>, + pub path_cache: PathCacher, } impl Debug for GdriveCore { @@ -66,213 +58,9 @@ impl Debug for GdriveCore { } impl GdriveCore { - /// Get the file id by path. - /// Including file and folder. - /// - /// The path is rooted at the root of the Google Drive. - /// - /// # Notes - /// - /// - A path is a sequence of file names separated by slashes. - /// - A file only knows its parent id, but not its name. - /// - To find the file id of a file, we need to traverse the path from the root to the file. - pub(crate) async fn get_file_id_by_path(&self, file_path: &str) -> Result> { - let path = build_abs_path(&self.root, file_path); - - let mut cache = self.path_cache.lock().await; - - if let Some(id) = cache.get(&path) { - return Ok(Some(id.to_owned())); - } - - let mut parent_id = "root".to_owned(); - let file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); - - for (i, item) in file_path_items.iter().enumerate() { - let path_part = file_path_items[0..=i].join("/"); - if let Some(id) = cache.get(&path_part) { - parent_id = id.to_owned(); - continue; - } - - let id = if i != file_path_items.len() - 1 || path.ends_with('/') { - self.gdrive_search_folder(&parent_id, item).await? - } else { - self.gdrive_search_file(&parent_id, item) - .await? - .map(|v| v.id) - }; - - if let Some(id) = id { - parent_id = id; - cache.insert(path_part, parent_id.clone()); - } else { - return Ok(None); - }; - } - - Ok(Some(parent_id)) - } - - /// Ensure the parent path exists. - /// If the parent path does not exist, create it. - /// - /// # Notes - /// - /// - The path is rooted at the root of the Google Drive. - /// - Will create the parent path recursively. - pub(crate) async fn ensure_parent_path(&self, path: &str) -> Result { - let path = build_abs_path(&self.root, path); - - let mut parent: String = "root".to_owned(); - let mut file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); - file_path_items.pop(); - - let mut cache = self.path_cache.lock().await; - - for (i, item) in file_path_items.iter().enumerate() { - let path_part = file_path_items[0..=i].join("/"); - if let Some(id) = cache.get(&path_part) { - parent = id.to_owned(); - continue; - } - - let folder_id = self.gdrive_search_folder(&parent, item).await?; - let folder_id = if let Some(id) = folder_id { - id - } else { - self.gdrive_create_folder(&parent, item).await? - }; - - parent = folder_id; - cache.insert(path_part, parent.clone()); - } - - Ok(parent.to_owned()) - } - - /// Search a folder by name - /// - /// returns it's file id if exists, otherwise returns `None`. - pub async fn gdrive_search_file( - &self, - parent: &str, - basename: &str, - ) -> Result> { - let query = - format!("name = \"{basename}\" and \"{parent}\" in parents and trashed = false"); - let url = format!( - "https://www.googleapis.com/drive/v3/files?q={}", - percent_encode_path(&query) - ); - - let mut req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - let resp = self.client.send(req).await?; - let status = resp.status(); - if !status.is_success() { - return Err(parse_error(resp).await?); - } - - let body = resp.into_body().bytes().await?; - let mut file_list: GdriveFileList = - serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; - - if file_list.files.len() > 1 { - return Err(Error::new( - ErrorKind::Unexpected, - "please ensure that the file corresponding to the path is unique.", - )); - } - - Ok(file_list.files.pop()) - } - - /// Search a folder by name - /// - /// returns it's file id if exists, otherwise returns `None`. - pub async fn gdrive_search_folder( - &self, - parent: &str, - basename: &str, - ) -> Result> { - let query = format!( - "name = \"{}\" and \"{}\" in parents and trashed = false and mimeType = 'application/vnd.google-apps.folder'", - basename, parent - ); - let url = format!( - "https://www.googleapis.com/drive/v3/files?q={}", - percent_encode_path(query.as_str()) - ); - - let mut req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - let resp = self.client.send(req).await?; - let status = resp.status(); - - match status { - StatusCode::OK => { - let body = resp.into_body().bytes().await?; - let meta: GdriveFileList = - serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; - - if let Some(f) = meta.files.first() { - Ok(Some(f.id.clone())) - } else { - Ok(None) - } - } - _ => Err(parse_error(resp).await?), - } - } - - /// Create a folder. - /// - /// # Input - /// - /// `parent_id` is the parent folder id. - /// - /// # Output - /// - /// Returns created folder's id while success, otherwise returns an error. - pub async fn gdrive_create_folder(&self, parent_id: &str, name: &str) -> Result { - let url = "https://www.googleapis.com/drive/v3/files"; - - let content = serde_json::to_vec(&json!({ - "name": name, - "mimeType": "application/vnd.google-apps.folder", - // If the parent is not provided, the folder will be created in the root folder. - "parents": [parent_id], - })) - .map_err(new_json_serialize_error)?; - - let mut req = Request::post(url) - .header(header::CONTENT_TYPE, "application/json") - .body(AsyncBody::Bytes(Bytes::from(content))) - .map_err(new_request_build_error)?; - - self.sign(&mut req).await?; - - let resp = self.client.send(req).await?; - if !resp.status().is_success() { - return Err(parse_error(resp).await?); - } - let body = resp.into_body().bytes().await?; - let file: GdriveFile = serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; - - Ok(file.id) - } - pub async fn gdrive_stat(&self, path: &str) -> Result> { - let path_id = self.get_file_id_by_path(path).await?.ok_or(Error::new( + let path = build_abs_path(&self.root, path); + let file_id = self.path_cache.get(&path).await?.ok_or(Error::new( ErrorKind::NotFound, &format!("path not found: {}", path), ))?; @@ -281,7 +69,7 @@ impl GdriveCore { // For now, we only need the file id, name, mime type and modified time. let mut req = Request::get(&format!( "https://www.googleapis.com/drive/v3/files/{}?fields=id,name,mimeType,size,modifiedTime", - path_id + file_id )) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; @@ -291,7 +79,8 @@ impl GdriveCore { } pub async fn gdrive_get(&self, path: &str) -> Result> { - let path_id = self.get_file_id_by_path(path).await?.ok_or(Error::new( + let path = build_abs_path(&self.root, path); + let path_id = self.path_cache.get(&path).await?.ok_or(Error::new( ErrorKind::NotFound, &format!("path not found: {}", path), ))?; @@ -339,37 +128,30 @@ impl GdriveCore { source: &str, target: &str, ) -> Result> { - let file_id = self.get_file_id_by_path(source).await?.ok_or(Error::new( + let source_file_id = self.path_cache.get(source).await?.ok_or(Error::new( ErrorKind::NotFound, &format!("source path not found: {}", source), ))?; + let source_parent = get_parent(source); + let source_parent_id = self + .path_cache + .get(source_parent) + .await? + .expect("old parent must exist"); - let parent = self.ensure_parent_path(target).await?; - - let url = format!("https://www.googleapis.com/drive/v3/files/{}", file_id); - - let source_abs_path = build_abs_path(&self.root, source); - let mut source_parent: Vec<&str> = source_abs_path - .split('/') - .filter(|&x| !x.is_empty()) - .collect(); - source_parent.pop(); - - let cache = self.path_cache.lock().await; - - let file_name = build_abs_path(&self.root, target) - .split('/') - .filter(|&x| !x.is_empty()) - .last() - .unwrap() - .to_string(); + let target_parent_id = self.path_cache.ensure_dir(get_parent(target)).await?; + let target_file_name = get_basename(target); let metadata = &json!({ - "name": file_name, - "removeParents": [cache.get(&source_parent.join("/")).unwrap().to_string()], - "addParents": [parent], + "name": target_file_name, + "removeParents": [source_parent_id], + "addParents": [target_parent_id], }); + let url = format!( + "https://www.googleapis.com/drive/v3/files/{}", + source_file_id + ); let mut req = Request::patch(url) .body(AsyncBody::Bytes(Bytes::from(metadata.to_string()))) .map_err(new_request_build_error)?; @@ -379,11 +161,7 @@ impl GdriveCore { self.client.send(req).await } - pub async fn gdrive_delete(&self, path: &str) -> Result> { - let file_id = self.get_file_id_by_path(path).await?.ok_or(Error::new( - ErrorKind::NotFound, - &format!("path not found: {}", path), - ))?; + pub async fn gdrive_delete(&self, file_id: &str) -> Result> { let url = format!("https://www.googleapis.com/drive/v3/files/{}", file_id); let mut req = Request::delete(&url) @@ -402,16 +180,17 @@ impl GdriveCore { size: u64, body: Bytes, ) -> Result> { - let parent = self.ensure_parent_path(path).await?; + let parent = self.path_cache.ensure_dir(get_parent(path)).await?; let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart"; - let file_name = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); + let file_name = get_basename(path); - let metadata = &json!({ + let metadata = serde_json::to_vec(&json!({ "name": file_name, "parents": [parent], - }); + })) + .map_err(new_json_serialize_error)?; let req = Request::post(url).header("X-Upload-Content-Length", size); @@ -422,7 +201,7 @@ impl GdriveCore { header::CONTENT_TYPE, "application/json; charset=UTF-8".parse().unwrap(), ) - .content(metadata.to_string()), + .content(metadata), ) .part( FormDataPart::new("file") @@ -470,9 +249,40 @@ impl GdriveCore { pub async fn sign(&self, req: &mut Request) -> Result<()> { let mut signer = self.signer.lock().await; + signer.sign(req).await + } +} - if !signer.access_token.is_empty() && signer.expires_in > Utc::now() { - let value = format!("Bearer {}", signer.access_token) +#[derive(Clone)] +pub struct GdriveSigner { + pub client: HttpClient, + + pub client_id: String, + pub client_secret: String, + pub refresh_token: String, + + pub access_token: String, + pub expires_in: DateTime, +} + +impl GdriveSigner { + /// Create a new signer. + pub fn new(client: HttpClient) -> Self { + GdriveSigner { + client, + + client_id: "".to_string(), + client_secret: "".to_string(), + refresh_token: "".to_string(), + access_token: "".to_string(), + expires_in: DateTime::::MIN_UTC, + } + } + + /// Sign a request. + pub async fn sign(&mut self, req: &mut Request) -> Result<()> { + if !self.access_token.is_empty() && self.expires_in > Utc::now() { + let value = format!("Bearer {}", self.access_token) .parse() .expect("access token must be valid header value"); @@ -482,7 +292,7 @@ impl GdriveCore { let url = format!( "https://oauth2.googleapis.com/token?refresh_token={}&client_id={}&client_secret={}&grant_type=refresh_token", - signer.refresh_token, signer.client_id, signer.client_secret + self.refresh_token, self.client_id, self.client_secret ); { @@ -498,8 +308,8 @@ impl GdriveCore { let resp_body = &resp.into_body().bytes().await?; let token = serde_json::from_slice::(resp_body) .map_err(new_json_deserialize_error)?; - signer.access_token = token.access_token.clone(); - signer.expires_in = Utc::now() + chrono::Duration::seconds(token.expires_in) + self.access_token = token.access_token.clone(); + self.expires_in = Utc::now() + chrono::Duration::seconds(token.expires_in) - chrono::Duration::seconds(120); } _ => { @@ -508,7 +318,7 @@ impl GdriveCore { } } - let auth_header_content = format!("Bearer {}", signer.access_token); + let auth_header_content = format!("Bearer {}", self.access_token); req.headers_mut() .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap()); @@ -516,26 +326,94 @@ impl GdriveCore { } } -#[derive(Clone)] -pub struct GdriveSigner { - pub client_id: String, - pub client_secret: String, - pub refresh_token: String, +pub struct GdrivePathQuery { + pub client: HttpClient, + pub signer: Arc>, +} - pub access_token: String, - pub expires_in: DateTime, +impl GdrivePathQuery { + pub fn new(client: HttpClient, signer: Arc>) -> Self { + GdrivePathQuery { client, signer } + } } -impl Default for GdriveSigner { - fn default() -> Self { - GdriveSigner { - access_token: String::new(), - expires_in: DateTime::::MIN_UTC, +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl PathQuery for GdrivePathQuery { + async fn root(&self) -> Result { + Ok("root".to_string()) + } + + async fn query(&self, parent_id: &str, name: &str) -> Result> { + let mut queries = vec![ + // Make sure name has been replaced with escaped name. + // + // ref: + format!("name = '{}'", name.replace('\'', "\\'")), + format!("'{}' in parents", parent_id), + "trashed = false".to_string(), + ]; + if name.ends_with('/') { + queries.push("mimeType = 'application/vnd.google-apps.folder'".to_string()); + } + let query = queries.join(" and "); + + let url = format!( + "https://www.googleapis.com/drive/v3/files?q={}", + percent_encode_path(query.as_str()) + ); + + let mut req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.signer.lock().await.sign(&mut req).await?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + let meta: GdriveFileList = + serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; + + if let Some(f) = meta.files.first() { + Ok(Some(f.id.clone())) + } else { + Ok(None) + } + } + _ => Err(parse_error(resp).await?), + } + } + + async fn create_dir(&self, parent_id: &str, name: &str) -> Result { + let url = "https://www.googleapis.com/drive/v3/files"; + + let content = serde_json::to_vec(&json!({ + "name": name, + "mimeType": "application/vnd.google-apps.folder", + // If the parent is not provided, the folder will be created in the root folder. + "parents": [parent_id], + })) + .map_err(new_json_serialize_error)?; + + let mut req = Request::post(url) + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(Bytes::from(content))) + .map_err(new_request_build_error)?; + + self.signer.lock().await.sign(&mut req).await?; - refresh_token: String::new(), - client_id: String::new(), - client_secret: String::new(), + let resp = self.client.send(req).await?; + if !resp.status().is_success() { + return Err(parse_error(resp).await?); } + + let body = resp.into_body().bytes().await?; + let file: GdriveFile = serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; + Ok(file.id) } } @@ -545,9 +423,9 @@ pub struct GdriveTokenResponse { expires_in: i64, } -// This is the file struct returned by the Google Drive API. -// This is a complex struct, but we only add the fields we need. -// refer to https://developers.google.com/drive/api/reference/rest/v3/files#File +/// This is the file struct returned by the Google Drive API. +/// This is a complex struct, but we only add the fields we need. +/// refer to https://developers.google.com/drive/api/reference/rest/v3/files#File #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct GdriveFile { @@ -563,7 +441,7 @@ pub struct GdriveFile { pub modified_time: Option, } -// refer to https://developers.google.com/drive/api/reference/rest/v3/files/list +/// refer to https://developers.google.com/drive/api/reference/rest/v3/files/list #[derive(Deserialize)] #[serde(rename_all = "camelCase")] pub(crate) struct GdriveFileList { diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs index 9da239035d6..db0ad063a65 100644 --- a/core/src/services/gdrive/lister.rs +++ b/core/src/services/gdrive/lister.rs @@ -41,7 +41,7 @@ impl GdriveLister { #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::PageList for GdriveLister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { - let file_id = self.core.get_file_id_by_path(&self.path).await?; + let file_id = self.core.path_cache.get(&self.path).await?; let file_id = match file_id { Some(file_id) => file_id, @@ -78,18 +78,22 @@ impl oio::PageList for GdriveLister { for mut file in decoded_response.files { let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" { - file.name = format!("{}/", file.name); + if !file.name.ends_with('/') { + file.name += "/"; + } EntryMode::DIR } else { EntryMode::FILE }; let root = &self.core.root; - let path = format!("{}{}", build_rooted_abs_path(root, &self.path), file.name); + let path = format!("{}{}", &self.path, file.name); let normalized_path = build_rel_path(root, &path); - let entry = oio::Entry::new(&normalized_path, Metadata::new(file_type)); + // Update path cache with list result. + self.core.path_cache.insert(&path, &file.id).await; + let entry = oio::Entry::new(&normalized_path, Metadata::new(file_type)); ctx.entries.push_back(entry); } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index f902ed2559c..251787064ea 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -18,10 +18,9 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; -use super::core::GdriveCore; +use super::core::{GdriveCore, GdriveFile}; use super::error::parse_error; use crate::raw::oio::WriteBuf; use crate::raw::*; @@ -44,47 +43,6 @@ impl GdriveWriter { file_id, } } - - /// Write a single chunk of data to the object. - /// - /// This is used for small objects. - /// And should overwrite the object if it already exists. - pub async fn write_create(&self, size: u64, body: Bytes) -> Result<()> { - let resp = self - .core - .gdrive_upload_simple_request(&self.path, size, body) - .await?; - - let status = resp.status(); - - match status { - StatusCode::OK | StatusCode::CREATED => { - resp.into_body().consume().await?; - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } - - pub async fn write_overwrite(&self, size: u64, body: Bytes) -> Result<()> { - let file_id = self.file_id.as_ref().ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "file_id is required for overwrite") - })?; - let resp = self - .core - .gdrive_upload_overwrite_simple_request(file_id, size, body) - .await?; - - let status = resp.status(); - - match status { - StatusCode::OK => { - resp.into_body().consume().await?; - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -93,12 +51,32 @@ impl oio::OneShotWrite for GdriveWriter { async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { let bs = bs.bytes(bs.remaining()); let size = bs.len(); - if self.file_id.is_none() { - self.write_create(size as u64, bs).await?; + + let resp = if let Some(file_id) = &self.file_id { + self.core + .gdrive_upload_overwrite_simple_request(file_id, size as u64, bs) + .await } else { - self.write_overwrite(size as u64, bs).await?; - } + self.core + .gdrive_upload_simple_request(&self.path, size as u64, bs) + .await + }?; - Ok(()) + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::CREATED => { + // If we don't have the file id before, let's update the cache to avoid re-fetching. + if self.file_id.is_none() { + let bs = resp.into_body().bytes().await?; + let file: GdriveFile = + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; + self.core.path_cache.insert(&self.path, &file.id).await; + } else { + resp.into_body().consume().await?; + } + Ok(()) + } + _ => Err(parse_error(resp).await?), + } } } diff --git a/core/tests/behavior/async_list.rs b/core/tests/behavior/async_list.rs index 2ac169c20df..ffb64e0058c 100644 --- a/core/tests/behavior/async_list.rs +++ b/core/tests/behavior/async_list.rs @@ -259,11 +259,15 @@ pub async fn test_list_empty_dir(op: Operator) -> Result<()> { while let Some(de) = obs.try_next().await? { objects.insert(de.path().to_string(), de); } - assert_eq!(objects.len(), 1, "only return the dir itself"); + assert_eq!( + objects.len(), + 1, + "only return the dir itself, but found: {objects:?}" + ); assert_eq!( objects[&dir].metadata().mode(), EntryMode::DIR, - "given dir should exist and must be dir" + "given dir should exist and must be dir, but found: {objects:?}" ); // List "dir/" should return empty object. @@ -317,6 +321,7 @@ pub async fn test_list_sub_dir(op: Operator) -> Result<()> { let mut obs = op.lister("/").await?; let mut found = false; + let mut entries = vec![]; while let Some(de) = obs.try_next().await? { if de.path() == path { let meta = op.stat(&path).await?; @@ -325,8 +330,13 @@ pub async fn test_list_sub_dir(op: Operator) -> Result<()> { found = true } + entries.push(de) } - assert!(found, "dir should be found in list"); + assert!( + found, + "dir should be found in list, but only got: {:?}", + entries + ); op.delete(&path).await.expect("delete must succeed"); Ok(())