From e98498e2537380cd1d0ececec18416f893e17c7d Mon Sep 17 00:00:00 2001 From: Yuhui Date: Fri, 3 Jan 2025 17:03:39 +0800 Subject: [PATCH] [#6012] feat (gvfs-fuse): Support Gravitino S3 fileset filesystem operation in gvfs fuse (#6013) ### What changes were proposed in this pull request? Support a Gravitino S3 fileset filesystem operation in gvfs fuse, implemented by OpenDal ### Why are the changes needed? Fix: #6012 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test --------- Co-authored-by: Qiming Teng --- clients/filesystem-fuse/Cargo.toml | 1 + clients/filesystem-fuse/conf/gvfs_fuse.toml | 6 +- clients/filesystem-fuse/src/config.rs | 6 +- .../src/default_raw_filesystem.rs | 98 ++++-- clients/filesystem-fuse/src/error.rs | 2 + clients/filesystem-fuse/src/filesystem.rs | 109 ++++--- .../filesystem-fuse/src/fuse_api_handle.rs | 12 +- .../filesystem-fuse/src/gravitino_client.rs | 76 +++++ .../src/gravitino_fileset_filesystem.rs | 57 +++- clients/filesystem-fuse/src/gvfs_creator.rs | 166 ++++++++++ clients/filesystem-fuse/src/gvfs_fuse.rs | 127 +------- clients/filesystem-fuse/src/lib.rs | 3 + clients/filesystem-fuse/src/main.rs | 32 +- .../filesystem-fuse/src/memory_filesystem.rs | 32 +- .../src/open_dal_filesystem.rs | 297 ++++++++++++++++++ clients/filesystem-fuse/src/opened_file.rs | 26 ++ clients/filesystem-fuse/src/s3_filesystem.rs | 276 ++++++++++++++++ clients/filesystem-fuse/src/utils.rs | 29 +- .../{gvfs_fuse_test.toml => config_test.toml} | 6 +- .../tests/conf/gvfs_fuse_memory.toml | 8 +- .../tests/conf/gvfs_fuse_s3.toml | 43 +++ clients/filesystem-fuse/tests/fuse_test.rs | 21 +- 22 files changed, 1202 insertions(+), 231 deletions(-) create mode 100644 clients/filesystem-fuse/src/gvfs_creator.rs create mode 100644 clients/filesystem-fuse/src/open_dal_filesystem.rs create mode 100644 clients/filesystem-fuse/src/s3_filesystem.rs rename clients/filesystem-fuse/tests/conf/{gvfs_fuse_test.toml => config_test.toml} (91%) create mode 100644 clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml diff --git a/clients/filesystem-fuse/Cargo.toml b/clients/filesystem-fuse/Cargo.toml index 4008ec5ca2f..3760bd5285f 100644 --- a/clients/filesystem-fuse/Cargo.toml +++ b/clients/filesystem-fuse/Cargo.toml @@ -42,6 +42,7 @@ futures-util = "0.3.30" libc = "0.2.168" log = "0.4.22" once_cell = "1.20.2" +opendal = { version = "0.46.0", features = ["services-s3"] } reqwest = { version = "0.12.9", features = ["json"] } serde = { version = "1.0.216", features = ["derive"] } tokio = { version = "1.38.0", features = ["full"] } diff --git a/clients/filesystem-fuse/conf/gvfs_fuse.toml b/clients/filesystem-fuse/conf/gvfs_fuse.toml index 94d3d8560fd..4bde0e9e1bd 100644 --- a/clients/filesystem-fuse/conf/gvfs_fuse.toml +++ b/clients/filesystem-fuse/conf/gvfs_fuse.toml @@ -32,7 +32,7 @@ block_size = 8192 uri = "http://localhost:8090" metalake = "your_metalake" -# extent settings +# extend settings [extend_config] -access_key = "your access_key" -secret_key = "your_secret_key" +s3-access_key_id = "your access_key" +s3-secret_access_key = "your_secret_key" diff --git a/clients/filesystem-fuse/src/config.rs b/clients/filesystem-fuse/src/config.rs index b381caa75c5..17908fd08fc 100644 --- a/clients/filesystem-fuse/src/config.rs +++ b/clients/filesystem-fuse/src/config.rs @@ -302,18 +302,18 @@ mod test { #[test] fn test_config_from_file() { - let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_test.toml")).unwrap(); + let config = AppConfig::from_file(Some("tests/conf/config_test.toml")).unwrap(); assert_eq!(config.fuse.file_mask, 0o644); assert_eq!(config.fuse.dir_mask, 0o755); assert_eq!(config.filesystem.block_size, 8192); assert_eq!(config.gravitino.uri, "http://localhost:8090"); assert_eq!(config.gravitino.metalake, "test"); assert_eq!( - config.extend_config.get("access_key"), + config.extend_config.get("s3-access_key_id"), Some(&"XXX_access_key".to_string()) ); assert_eq!( - config.extend_config.get("secret_key"), + config.extend_config.get("s3-secret_access_key"), Some(&"XXX_secret_key".to_string()) ); } diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs b/clients/filesystem-fuse/src/default_raw_filesystem.rs index 0c9836e5b33..944181246d5 100644 --- a/clients/filesystem-fuse/src/default_raw_filesystem.rs +++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs @@ -18,10 +18,11 @@ */ use crate::config::AppConfig; use crate::filesystem::{ - FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, - ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH, + FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, FS_META_FILE_ID, + FS_META_FILE_NAME, FS_META_FILE_PATH, INITIAL_FILE_ID, ROOT_DIR_FILE_ID, + ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH, }; -use crate::opened_file::{FileHandle, OpenFileFlags}; +use crate::opened_file::{FileHandle, OpenFileFlags, OpenedFile}; use crate::opened_file_manager::OpenedFileManager; use async_trait::async_trait; use bytes::Bytes; @@ -78,6 +79,7 @@ impl DefaultRawFileSystem { } async fn resolve_file_id_to_filestat(&self, file_stat: &mut FileStat, parent_file_id: u64) { + debug_assert!(parent_file_id != 0); let mut file_manager = self.file_entry_manager.write().await; let file_entry = file_manager.get_file_entry_by_path(&file_stat.path); match file_entry { @@ -132,6 +134,21 @@ impl DefaultRawFileSystem { let mut file_manager = self.file_entry_manager.write().await; file_manager.insert(parent_file_id, file_id, path); } + + fn get_meta_file_stat(&self) -> FileStat { + let mut meta_file_stat = + FileStat::new_file_filestat_with_path(Path::new(FS_META_FILE_PATH), 0); + meta_file_stat.set_file_id(ROOT_DIR_FILE_ID, FS_META_FILE_ID); + meta_file_stat + } + + fn is_meta_file(&self, file_id: u64) -> bool { + file_id == FS_META_FILE_ID + } + + fn is_meta_file_name(&self, parent_file_id: u64, name: &OsStr) -> bool { + parent_file_id == ROOT_DIR_FILE_ID && name == OsStr::new(FS_META_FILE_NAME) + } } #[async_trait] @@ -144,6 +161,13 @@ impl RawFileSystem for DefaultRawFileSystem { Path::new(ROOT_DIR_PATH), ) .await; + + self.insert_file_entry_locked( + ROOT_DIR_FILE_ID, + FS_META_FILE_ID, + Path::new(FS_META_FILE_PATH), + ) + .await; self.fs.init().await } @@ -168,6 +192,10 @@ impl RawFileSystem for DefaultRawFileSystem { } async fn stat(&self, file_id: u64) -> Result { + if self.is_meta_file(file_id) { + return Ok(self.get_meta_file_stat()); + } + let file_entry = self.get_file_entry(file_id).await?; let mut file_stat = self.fs.stat(&file_entry.path).await?; file_stat.set_file_id(file_entry.parent_file_id, file_entry.file_id); @@ -175,8 +203,11 @@ impl RawFileSystem for DefaultRawFileSystem { } async fn lookup(&self, parent_file_id: u64, name: &OsStr) -> Result { - let parent_file_entry = self.get_file_entry(parent_file_id).await?; + if self.is_meta_file_name(parent_file_id, name) { + return Ok(self.get_meta_file_stat()); + } + let parent_file_entry = self.get_file_entry(parent_file_id).await?; let path = parent_file_entry.path.join(name); let mut file_stat = self.fs.stat(&path).await?; // fill the file id to file stat @@ -192,10 +223,21 @@ impl RawFileSystem for DefaultRawFileSystem { for file_stat in child_filestats.iter_mut() { self.resolve_file_id_to_filestat(file_stat, file_id).await; } + + if file_id == ROOT_DIR_FILE_ID { + child_filestats.push(self.get_meta_file_stat()); + } Ok(child_filestats) } async fn open_file(&self, file_id: u64, flags: u32) -> Result { + if self.is_meta_file(file_id) { + let meta_file = OpenedFile::new(self.get_meta_file_stat()); + let resutl = self.opened_file_manager.put(meta_file); + let file = resutl.lock().await; + return Ok(file.file_handle()); + } + self.open_file_internal(file_id, flags, FileType::RegularFile) .await } @@ -211,6 +253,10 @@ impl RawFileSystem for DefaultRawFileSystem { name: &OsStr, flags: u32, ) -> Result { + if self.is_meta_file_name(parent_file_id, name) { + return Err(Errno::from(libc::EEXIST)); + } + let parent_file_entry = self.get_file_entry(parent_file_id).await?; let mut file_without_id = self .fs @@ -247,11 +293,19 @@ impl RawFileSystem for DefaultRawFileSystem { } async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + if self.is_meta_file(file_id) { + return Ok(()); + } + let file_entry = self.get_file_entry(file_id).await?; self.fs.set_attr(&file_entry.path, file_stat, true).await } async fn remove_file(&self, parent_file_id: u64, name: &OsStr) -> Result<()> { + if self.is_meta_file_name(parent_file_id, name) { + return Err(Errno::from(libc::EPERM)); + } + let parent_file_entry = self.get_file_entry(parent_file_id).await?; let path = parent_file_entry.path.join(name); self.fs.remove_file(&path).await?; @@ -271,6 +325,15 @@ impl RawFileSystem for DefaultRawFileSystem { Ok(()) } + async fn flush_file(&self, _file_id: u64, fh: u64) -> Result<()> { + let opened_file = self + .opened_file_manager + .get(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut file = opened_file.lock().await; + file.flush().await + } + async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> { let opened_file = self .opened_file_manager @@ -280,7 +343,11 @@ impl RawFileSystem for DefaultRawFileSystem { file.close().await } - async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) -> Result { + async fn read(&self, file_id: u64, fh: u64, offset: u64, size: u32) -> Result { + if self.is_meta_file(file_id) { + return Ok(Bytes::new()); + } + let (data, file_stat) = { let opened_file = self .opened_file_manager @@ -297,7 +364,11 @@ impl RawFileSystem for DefaultRawFileSystem { data } - async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result { + async fn write(&self, file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result { + if self.is_meta_file(file_id) { + return Err(Errno::from(libc::EPERM)); + } + let (len, file_stat) = { let opened_file = self .opened_file_manager @@ -368,8 +439,6 @@ impl FileEntryManager { #[cfg(test)] mod tests { use super::*; - use crate::filesystem::tests::TestRawFileSystem; - use crate::memory_filesystem::MemoryFileSystem; #[test] fn test_file_entry_manager() { @@ -389,17 +458,4 @@ mod tests { assert!(manager.get_file_entry_by_id(2).is_none()); assert!(manager.get_file_entry_by_path(Path::new("a/b")).is_none()); } - - #[tokio::test] - async fn test_default_raw_file_system() { - let memory_fs = MemoryFileSystem::new().await; - let raw_fs = DefaultRawFileSystem::new( - memory_fs, - &AppConfig::default(), - &FileSystemContext::default(), - ); - let _ = raw_fs.init().await; - let mut tester = TestRawFileSystem::new(raw_fs); - tester.test_raw_file_system().await; - } } diff --git a/clients/filesystem-fuse/src/error.rs b/clients/filesystem-fuse/src/error.rs index ba3c037c5ca..7e38e46874c 100644 --- a/clients/filesystem-fuse/src/error.rs +++ b/clients/filesystem-fuse/src/error.rs @@ -24,6 +24,7 @@ pub enum ErrorCode { GravitinoClientError, InvalidConfig, ConfigNotFound, + OpenDalError, } impl ErrorCode { @@ -39,6 +40,7 @@ impl std::fmt::Display for ErrorCode { ErrorCode::GravitinoClientError => write!(f, "Gravitino client error"), ErrorCode::InvalidConfig => write!(f, "Invalid config"), ErrorCode::ConfigNotFound => write!(f, "Config not found"), + ErrorCode::OpenDalError => write!(f, "OpenDal error"), } } } diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs index 742cdd4c879..dcf35f8ebca 100644 --- a/clients/filesystem-fuse/src/filesystem.rs +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -36,6 +36,11 @@ pub(crate) const ROOT_DIR_NAME: &str = ""; pub(crate) const ROOT_DIR_PATH: &str = "/"; pub(crate) const INITIAL_FILE_ID: u64 = 10000; +// File system meta file is indicated the fuse filesystem is active. +pub(crate) const FS_META_FILE_PATH: &str = "/.gvfs_meta"; +pub(crate) const FS_META_FILE_NAME: &str = ".gvfs_meta"; +pub(crate) const FS_META_FILE_ID: u64 = 10; + /// RawFileSystem interface for the file system implementation. it use by FuseApiHandle, /// it ues the file id to operate the file system apis /// the `file_id` and `parent_file_id` it is the unique identifier for the file system, @@ -89,6 +94,9 @@ pub(crate) trait RawFileSystem: Send + Sync { /// Remove the directory by parent file id and file name async fn remove_dir(&self, parent_file_id: u64, name: &OsStr) -> Result<()>; + /// flush the file with file id and file handle, if successful return Ok + async fn flush_file(&self, file_id: u64, fh: u64) -> Result<()>; + /// Close the file by file id and file handle, if successful async fn close_file(&self, file_id: u64, fh: u64) -> Result<()>; @@ -289,57 +297,53 @@ pub trait FileWriter: Sync + Send { #[cfg(test)] pub(crate) mod tests { use super::*; + use libc::{O_APPEND, O_CREAT, O_RDONLY}; use std::collections::HashMap; + use std::path::Component; pub(crate) struct TestPathFileSystem { files: HashMap, fs: F, + cwd: PathBuf, } impl TestPathFileSystem { - pub(crate) fn new(fs: F) -> Self { + pub(crate) fn new(cwd: &Path, fs: F) -> Self { Self { files: HashMap::new(), fs, + cwd: cwd.into(), } } pub(crate) async fn test_path_file_system(&mut self) { - // Test root dir - self.test_root_dir().await; + // test root dir + let resutl = self.fs.stat(Path::new("/")).await; + assert!(resutl.is_ok()); + let root_file_stat = resutl.unwrap(); + self.assert_file_stat(&root_file_stat, Path::new("/"), Directory, 0); - // Test stat file - self.test_stat_file(Path::new("/.gvfs_meta"), RegularFile, 0) - .await; + // test list root dir + let result = self.fs.read_dir(Path::new("/")).await; + assert!(result.is_ok()); // Test create file - self.test_create_file(Path::new("/file1.txt")).await; + self.test_create_file(&self.cwd.join("file1.txt")).await; // Test create dir - self.test_create_dir(Path::new("/dir1")).await; + self.test_create_dir(&self.cwd.join("dir1")).await; // Test list dir - self.test_list_dir(Path::new("/")).await; + self.test_list_dir(&self.cwd).await; // Test remove file - self.test_remove_file(Path::new("/file1.txt")).await; + self.test_remove_file(&self.cwd.join("file1.txt")).await; // Test remove dir - self.test_remove_dir(Path::new("/dir1")).await; + self.test_remove_dir(&self.cwd.join("dir1")).await; // Test file not found - self.test_file_not_found(Path::new("unknown")).await; - - // Test list dir - self.test_list_dir(Path::new("/")).await; - } - - async fn test_root_dir(&mut self) { - let root_dir_path = Path::new("/"); - let root_file_stat = self.fs.stat(root_dir_path).await; - assert!(root_file_stat.is_ok()); - let root_file_stat = root_file_stat.unwrap(); - self.assert_file_stat(&root_file_stat, root_dir_path, Directory, 0); + self.test_file_not_found(&self.cwd.join("unknown")).await; } async fn test_stat_file(&mut self, path: &Path, expect_kind: FileType, expect_size: u64) { @@ -370,7 +374,6 @@ pub(crate) mod tests { let list_dir = self.fs.read_dir(path).await; assert!(list_dir.is_ok()); let list_dir = list_dir.unwrap(); - assert_eq!(list_dir.len(), self.files.len()); for file_stat in list_dir { assert!(self.files.contains_key(&file_stat.path)); let actual_file_stat = self.files.get(&file_stat.path).unwrap(); @@ -414,13 +417,15 @@ pub(crate) mod tests { pub(crate) struct TestRawFileSystem { fs: F, files: HashMap, + cwd: PathBuf, } impl TestRawFileSystem { - pub(crate) fn new(fs: F) -> Self { + pub(crate) fn new(cwd: &Path, fs: F) -> Self { Self { fs, files: HashMap::new(), + cwd: cwd.into(), } } @@ -431,31 +436,45 @@ pub(crate) mod tests { // test read root dir self.test_list_dir(ROOT_DIR_FILE_ID, false).await; - let parent_file_id = ROOT_DIR_FILE_ID; - // Test lookup file + // Test lookup meta file let file_id = self - .test_lookup_file(parent_file_id, ".gvfs_meta".as_ref(), RegularFile, 0) + .test_lookup_file(ROOT_DIR_FILE_ID, ".gvfs_meta".as_ref(), RegularFile, 0) .await; - // Test get file stat + // Test get meta file stat self.test_stat_file(file_id, Path::new("/.gvfs_meta"), RegularFile, 0) .await; // Test get file path self.test_get_file_path(file_id, "/.gvfs_meta").await; - // Test create file - self.test_create_file(parent_file_id, "file1.txt".as_ref()) - .await; + // get cwd file id + let mut parent_file_id = ROOT_DIR_FILE_ID; + for child in self.cwd.components() { + if child == Component::RootDir { + continue; + } + let file_id = self.fs.create_dir(parent_file_id, child.as_os_str()).await; + assert!(file_id.is_ok()); + parent_file_id = file_id.unwrap(); + } - // Test open file + // Test create file let file_handle = self - .test_open_file(parent_file_id, "file1.txt".as_ref()) + .test_create_file(parent_file_id, "file1.txt".as_ref()) .await; // Test write file self.test_write_file(&file_handle, "test").await; + // Test close file + self.test_close_file(&file_handle).await; + + // Test open file with read + let file_handle = self + .test_open_file(parent_file_id, "file1.txt".as_ref(), O_RDONLY as u32) + .await; + // Test read file self.test_read_file(&file_handle, "test").await; @@ -526,8 +545,11 @@ pub(crate) mod tests { self.files.insert(file_stat.file_id, file_stat); } - async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) { - let file = self.fs.create_file(root_file_id, name, 0).await; + async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) -> FileHandle { + let file = self + .fs + .create_file(root_file_id, name, (O_CREAT | O_APPEND) as u32) + .await; assert!(file.is_ok()); let file = file.unwrap(); assert!(file.handle_id > 0); @@ -537,11 +559,12 @@ pub(crate) mod tests { self.test_stat_file(file.file_id, &file_stat.unwrap().path, RegularFile, 0) .await; + file } - async fn test_open_file(&self, root_file_id: u64, name: &OsStr) -> FileHandle { + async fn test_open_file(&self, root_file_id: u64, name: &OsStr, flags: u32) -> FileHandle { let file = self.fs.lookup(root_file_id, name).await.unwrap(); - let file_handle = self.fs.open_file(file.file_id, 0).await; + let file_handle = self.fs.open_file(file.file_id, flags).await; assert!(file_handle.is_ok()); let file_handle = file_handle.unwrap(); assert_eq!(file_handle.file_id, file.file_id); @@ -558,9 +581,16 @@ pub(crate) mod tests { content.as_bytes(), ) .await; + assert!(write_size.is_ok()); assert_eq!(write_size.unwrap(), content.len() as u32); + let result = self + .fs + .flush_file(file_handle.file_id, file_handle.handle_id) + .await; + assert!(result.is_ok()); + self.files.get_mut(&file_handle.file_id).unwrap().size = content.len() as u64; } @@ -606,7 +636,6 @@ pub(crate) mod tests { if !check_child { return; } - assert_eq!(list_dir.len(), self.files.len()); for file_stat in list_dir { assert!(self.files.contains_key(&file_stat.file_id)); let actual_file_stat = self.files.get(&file_stat.file_id).unwrap(); @@ -652,7 +681,7 @@ pub(crate) mod tests { assert_eq!(file_stat.path, path); assert_eq!(file_stat.kind, kind); assert_eq!(file_stat.size, size); - if file_stat.file_id == 1 { + if file_stat.file_id == ROOT_DIR_FILE_ID || file_stat.file_id == FS_META_FILE_ID { assert_eq!(file_stat.parent_file_id, 1); } else { assert!(file_stat.file_id >= INITIAL_FILE_ID); diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs b/clients/filesystem-fuse/src/fuse_api_handle.rs index 153e323891c..15679a222bd 100644 --- a/clients/filesystem-fuse/src/fuse_api_handle.rs +++ b/clients/filesystem-fuse/src/fuse_api_handle.rs @@ -227,7 +227,7 @@ impl Filesystem for FuseApiHandle { async fn release( &self, - _eq: Request, + _req: Request, inode: Inode, fh: u64, _flags: u32, @@ -237,6 +237,16 @@ impl Filesystem for FuseApiHandle { self.fs.close_file(inode, fh).await } + async fn flush( + &self, + _req: Request, + inode: Inode, + fh: u64, + _lock_owner: u64, + ) -> fuse3::Result<()> { + self.fs.flush_file(inode, fh).await + } + async fn opendir(&self, _req: Request, inode: Inode, flags: u32) -> fuse3::Result { let file_handle = self.fs.open_dir(inode, flags).await?; Ok(ReplyOpen { diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs index e5553c9f6c8..9bdfbb2c288 100644 --- a/clients/filesystem-fuse/src/gravitino_client.rs +++ b/clients/filesystem-fuse/src/gravitino_client.rs @@ -48,6 +48,22 @@ struct FileLocationResponse { location: String, } +#[derive(Debug, Deserialize)] +pub(crate) struct Catalog { + pub(crate) name: String, + #[serde(rename = "type")] + pub(crate) catalog_type: String, + provider: String, + comment: String, + pub(crate) properties: HashMap, +} + +#[derive(Debug, Deserialize)] +struct CatalogResponse { + code: u32, + catalog: Catalog, +} + pub(crate) struct GravitinoClient { gravitino_uri: String, metalake: String, @@ -105,6 +121,26 @@ impl GravitinoClient { Ok(res) } + pub async fn get_catalog_url(&self, catalog_name: &str) -> String { + format!( + "{}/api/metalakes/{}/catalogs/{}", + self.gravitino_uri, self.metalake, catalog_name + ) + } + + pub async fn get_catalog(&self, catalog_name: &str) -> Result { + let url = self.get_catalog_url(catalog_name).await; + let res = self.do_get::(&url).await?; + + if res.code != 0 { + return Err(GvfsError::Error( + ErrorCode::GravitinoClientError, + "Failed to get catalog".to_string(), + )); + } + Ok(res.catalog) + } + pub async fn get_fileset( &self, catalog_name: &str, @@ -257,6 +293,46 @@ mod tests { } } + #[tokio::test] + async fn test_get_catalog_success() { + let catalog_response = r#" + { + "code": 0, + "catalog": { + "name": "example_catalog", + "type": "example_type", + "provider": "example_provider", + "comment": "This is a test catalog", + "properties": { + "key1": "value1", + "key2": "value2" + } + } + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!("/api/metalakes/{}/catalogs/{}", "test", "catalog1"); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(catalog_response) + .create(); + + let config = GravitinoConfig { + uri: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client.get_catalog("catalog1").await; + + match result { + Ok(_) => {} + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } + async fn get_fileset_example() { tracing_subscriber::fmt::init(); let config = GravitinoConfig { diff --git a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs index 98a295dbb87..7da2f572dcc 100644 --- a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs +++ b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs @@ -30,13 +30,15 @@ use std::path::{Path, PathBuf}; pub(crate) struct GravitinoFilesetFileSystem { physical_fs: Box, client: GravitinoClient, - fileset_location: PathBuf, + // location is a absolute path in the physical filesystem that is associated with the fileset. + // e.g. fileset location : s3://bucket/path/to/file the location is /path/to/file + location: PathBuf, } impl GravitinoFilesetFileSystem { pub async fn new( fs: Box, - location: &Path, + target_path: &Path, client: GravitinoClient, _config: &AppConfig, _context: &FileSystemContext, @@ -44,18 +46,25 @@ impl GravitinoFilesetFileSystem { Self { physical_fs: fs, client: client, - fileset_location: location.into(), + location: target_path.into(), } } fn gvfs_path_to_raw_path(&self, path: &Path) -> PathBuf { - self.fileset_location.join(path) + let relation_path = path.strip_prefix("/").expect("path should start with /"); + if relation_path == Path::new("") { + return self.location.clone(); + } + self.location.join(relation_path) } fn raw_path_to_gvfs_path(&self, path: &Path) -> Result { - path.strip_prefix(&self.fileset_location) + let stripped_path = path + .strip_prefix(&self.location) .map_err(|_| Errno::from(libc::EBADF))?; - Ok(path.into()) + let mut result_path = PathBuf::from("/"); + result_path.push(stripped_path); + Ok(result_path) } } @@ -128,3 +137,39 @@ impl PathFileSystem for GravitinoFilesetFileSystem { self.physical_fs.get_capacity() } } + +#[cfg(test)] +mod tests { + use crate::config::GravitinoConfig; + use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem; + use crate::memory_filesystem::MemoryFileSystem; + use std::path::Path; + + #[tokio::test] + async fn test_map_fileset_path_to_raw_path() { + let fs = GravitinoFilesetFileSystem { + physical_fs: Box::new(MemoryFileSystem::new().await), + client: super::GravitinoClient::new(&GravitinoConfig::default()), + location: "/c1/fileset1".into(), + }; + let path = fs.gvfs_path_to_raw_path(Path::new("/a")); + assert_eq!(path, Path::new("/c1/fileset1/a")); + let path = fs.gvfs_path_to_raw_path(Path::new("/")); + assert_eq!(path, Path::new("/c1/fileset1")); + } + + #[tokio::test] + async fn test_map_raw_path_to_fileset_path() { + let fs = GravitinoFilesetFileSystem { + physical_fs: Box::new(MemoryFileSystem::new().await), + client: super::GravitinoClient::new(&GravitinoConfig::default()), + location: "/c1/fileset1".into(), + }; + let path = fs + .raw_path_to_gvfs_path(Path::new("/c1/fileset1/a")) + .unwrap(); + assert_eq!(path, Path::new("/a")); + let path = fs.raw_path_to_gvfs_path(Path::new("/c1/fileset1")).unwrap(); + assert_eq!(path, Path::new("/")); + } +} diff --git a/clients/filesystem-fuse/src/gvfs_creator.rs b/clients/filesystem-fuse/src/gvfs_creator.rs new file mode 100644 index 00000000000..aac88ad9d08 --- /dev/null +++ b/clients/filesystem-fuse/src/gvfs_creator.rs @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::AppConfig; +use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem}; +use crate::filesystem::{FileSystemContext, PathFileSystem}; +use crate::gravitino_client::{Catalog, Fileset, GravitinoClient}; +use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem; +use crate::gvfs_fuse::{CreateFileSystemResult, FileSystemSchema}; +use crate::s3_filesystem::S3FileSystem; +use crate::utils::{extract_root_path, parse_location, GvfsResult}; + +const GRAVITINO_FILESET_SCHEMA: &str = "gvfs"; + +pub async fn create_gvfs_filesystem( + mount_from: &str, + config: &AppConfig, + fs_context: &FileSystemContext, +) -> GvfsResult { + // Gvfs-fuse filesystem structure: + // FuseApiHandle + // ├─ DefaultRawFileSystem (RawFileSystem) + // │ └─ FileSystemLog (PathFileSystem) + // │ ├─ GravitinoComposedFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ S3FileSystem (PathFileSystem) + // │ │ │ └─ OpenDALFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ HDFSFileSystem (PathFileSystem) + // │ │ │ └─ OpenDALFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ JuiceFileSystem (PathFileSystem) + // │ │ │ └─ NasFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ XXXFileSystem (PathFileSystem) + // + // `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs. + // It manages file and directory relationships, as well as file mappings. + // It delegates file operations to the PathFileSystem + // + // `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs. + // Similar implementations include permissions, caching, and metrics. + // + // `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`. + // It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage. + // If the user only mounts a fileset, this layer is not present. There will only be one below layer. + // + // `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path. + // and delegate the operation to the real storage. + // + // `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage. + // it can assess the S3, HDFS, gcs, azblob and other storage. + // + // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage. + // + // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage. + // + // `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS. + // + // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage. + // + // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. + + let client = GravitinoClient::new(&config.gravitino); + + let (catalog_name, schema_name, fileset_name) = extract_fileset(mount_from)?; + let catalog = client.get_catalog(&catalog_name).await?; + if catalog.catalog_type != "fileset" { + return Err(InvalidConfig.to_error(format!("Catalog {} is not a fileset", catalog_name))); + } + let fileset = client + .get_fileset(&catalog_name, &schema_name, &fileset_name) + .await?; + + let inner_fs = create_fs_with_fileset(&catalog, &fileset, config, fs_context)?; + + let target_path = extract_root_path(fileset.storage_location.as_str())?; + let fs = + GravitinoFilesetFileSystem::new(inner_fs, &target_path, client, config, fs_context).await; + Ok(CreateFileSystemResult::Gvfs(fs)) +} + +fn create_fs_with_fileset( + catalog: &Catalog, + fileset: &Fileset, + config: &AppConfig, + fs_context: &FileSystemContext, +) -> GvfsResult> { + let schema = extract_filesystem_scheme(&fileset.storage_location)?; + + match schema { + FileSystemSchema::S3 => Ok(Box::new(S3FileSystem::new( + catalog, fileset, config, fs_context, + )?)), + } +} + +pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> { + let path = parse_location(path)?; + + if path.scheme() != GRAVITINO_FILESET_SCHEMA { + return Err(InvalidConfig.to_error(format!("Invalid fileset schema: {}", path))); + } + + let split = path.path_segments(); + if split.is_none() { + return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}", path))); + } + let split = split.unwrap().collect::>(); + if split.len() != 4 { + return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}", path))); + } + + let catalog = split[1].to_string(); + let schema = split[2].to_string(); + let fileset = split[3].to_string(); + Ok((catalog, schema, fileset)) +} + +pub fn extract_filesystem_scheme(path: &str) -> GvfsResult { + let url = parse_location(path)?; + let scheme = url.scheme(); + + match scheme { + "s3" => Ok(FileSystemSchema::S3), + "s3a" => Ok(FileSystemSchema::S3), + _ => Err(UnSupportedFilesystem.to_error(format!("Invalid storage schema: {}", path))), + } +} + +#[cfg(test)] +mod tests { + use crate::gvfs_creator::extract_fileset; + use crate::gvfs_fuse::FileSystemSchema; + + #[test] + fn test_extract_fileset() { + let location = "gvfs://fileset/test/c1/s1/fileset1"; + let (catalog, schema, fileset) = extract_fileset(location).unwrap(); + assert_eq!(catalog, "c1"); + assert_eq!(schema, "s1"); + assert_eq!(fileset, "fileset1"); + } + + #[test] + fn test_extract_schema() { + let location = "s3://bucket/path/to/file"; + let schema = super::extract_filesystem_scheme(location).unwrap(); + assert_eq!(schema, FileSystemSchema::S3); + } +} diff --git a/clients/filesystem-fuse/src/gvfs_fuse.rs b/clients/filesystem-fuse/src/gvfs_fuse.rs index d472895d2b3..88079e99b91 100644 --- a/clients/filesystem-fuse/src/gvfs_fuse.rs +++ b/clients/filesystem-fuse/src/gvfs_fuse.rs @@ -18,22 +18,19 @@ */ use crate::config::AppConfig; use crate::default_raw_filesystem::DefaultRawFileSystem; -use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem}; +use crate::error::ErrorCode::UnSupportedFilesystem; use crate::filesystem::FileSystemContext; use crate::fuse_api_handle::FuseApiHandle; use crate::fuse_server::FuseServer; -use crate::gravitino_client::GravitinoClient; use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem; +use crate::gvfs_creator::create_gvfs_filesystem; use crate::memory_filesystem::MemoryFileSystem; use crate::utils::GvfsResult; use log::info; use once_cell::sync::Lazy; -use std::path::Path; use std::sync::Arc; use tokio::sync::Mutex; -const FILESET_PREFIX: &str = "gvfs://fileset/"; - static SERVER: Lazy>>> = Lazy::new(|| Mutex::new(None)); pub(crate) enum CreateFileSystemResult { @@ -44,6 +41,7 @@ pub(crate) enum CreateFileSystemResult { None, } +#[derive(Debug, PartialEq)] pub enum FileSystemSchema { S3, } @@ -65,7 +63,7 @@ pub async fn mount(mount_to: &str, mount_from: &str, config: &AppConfig) -> Gvfs } pub async fn unmount() -> GvfsResult<()> { - info!("Stop gvfs-fuse server..."); + info!("Stopping gvfs-fuse server..."); let svr = { let mut server = SERVER.lock().await; if server.is_none() { @@ -127,120 +125,3 @@ pub async fn create_path_fs( create_gvfs_filesystem(mount_from, config, fs_context).await } } - -pub async fn create_gvfs_filesystem( - mount_from: &str, - config: &AppConfig, - fs_context: &FileSystemContext, -) -> GvfsResult { - // Gvfs-fuse filesystem structure: - // FuseApiHandle - // ├─ DefaultRawFileSystem (RawFileSystem) - // │ └─ FileSystemLog (PathFileSystem) - // │ ├─ GravitinoComposedFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ S3FileSystem (PathFileSystem) - // │ │ │ └─ OpenDALFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ HDFSFileSystem (PathFileSystem) - // │ │ │ └─ OpenDALFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ JuiceFileSystem (PathFileSystem) - // │ │ │ └─ NasFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ XXXFileSystem (PathFileSystem) - // - // `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs. - // It manages file and directory relationships, as well as file mappings. - // It delegates file operations to the PathFileSystem - // - // `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs. - // Similar implementations include permissions, caching, and metrics. - // - // `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`. - // It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage. - // If the user only mounts a fileset, this layer is not present. There will only be one below layer. - // - // `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path. - // and delegate the operation to the real storage. - // - // `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage. - // it can assess the S3, HDFS, gcs, azblob and other storage. - // - // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage. - // - // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage. - // - // `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS. - // - // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage. - // - // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. - - let client = GravitinoClient::new(&config.gravitino); - - let (catalog, schema, fileset) = extract_fileset(mount_from)?; - let location = client - .get_fileset(&catalog, &schema, &fileset) - .await? - .storage_location; - let (_schema, location) = extract_storage_filesystem(&location).unwrap(); - - // todo need to replace the inner filesystem with the real storage filesystem - let inner_fs = MemoryFileSystem::new().await; - - let fs = GravitinoFilesetFileSystem::new( - Box::new(inner_fs), - Path::new(&location), - client, - config, - fs_context, - ) - .await; - Ok(CreateFileSystemResult::Gvfs(fs)) -} - -pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> { - if !path.starts_with(FILESET_PREFIX) { - return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); - } - - let path_without_prefix = &path[FILESET_PREFIX.len()..]; - - let parts: Vec<&str> = path_without_prefix.split('/').collect(); - - if parts.len() != 3 { - return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); - } - // todo handle mount catalog or schema - - let catalog = parts[1].to_string(); - let schema = parts[2].to_string(); - let fileset = parts[3].to_string(); - - Ok((catalog, schema, fileset)) -} - -pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema, String)> { - // todo need to improve the logic - if let Some(pos) = path.find("://") { - let protocol = &path[..pos]; - let location = &path[pos + 3..]; - let location = match location.find('/') { - Some(index) => &location[index + 1..], - None => "", - }; - let location = match location.ends_with('/') { - true => location.to_string(), - false => format!("{}/", location), - }; - - match protocol { - "s3" => Some((FileSystemSchema::S3, location.to_string())), - "s3a" => Some((FileSystemSchema::S3, location.to_string())), - _ => None, - } - } else { - None - } -} diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs index 5532d619e5c..31e7c7fd8e1 100644 --- a/clients/filesystem-fuse/src/lib.rs +++ b/clients/filesystem-fuse/src/lib.rs @@ -27,10 +27,13 @@ mod fuse_api_handle; mod fuse_server; mod gravitino_client; mod gravitino_fileset_filesystem; +mod gvfs_creator; mod gvfs_fuse; mod memory_filesystem; +mod open_dal_filesystem; mod opened_file; mod opened_file_manager; +mod s3_filesystem; mod utils; pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &AppConfig) -> GvfsResult<()> { diff --git a/clients/filesystem-fuse/src/main.rs b/clients/filesystem-fuse/src/main.rs index 8eab5ec0d51..3534e033465 100644 --- a/clients/filesystem-fuse/src/main.rs +++ b/clients/filesystem-fuse/src/main.rs @@ -26,21 +26,37 @@ use tokio::signal; async fn main() -> fuse3::Result<()> { tracing_subscriber::fmt().init(); + // todo need inmprove the args parsing + let args: Vec = std::env::args().collect(); + let (mount_point, mount_from, config_path) = match args.len() { + 4 => (args[1].clone(), args[2].clone(), args[3].clone()), + _ => { + error!("Usage: {} ", args[0]); + return Err(Errno::from(libc::EINVAL)); + } + }; + //todo(read config file from args) - let config = AppConfig::from_file(Some("conf/gvfs_fuse.toml")); + let config = AppConfig::from_file(Some(&config_path)); if let Err(e) = &config { error!("Failed to load config: {:?}", e); return Err(Errno::from(libc::EINVAL)); } let config = config.unwrap(); - let handle = tokio::spawn(async move { gvfs_mount("gvfs", "", &config).await }); - - let _ = signal::ctrl_c().await; - info!("Received Ctrl+C, Unmounting gvfs..."); + let handle = tokio::spawn(async move { + let result = gvfs_mount(&mount_point, &mount_from, &config).await; + if let Err(e) = result { + error!("Failed to mount gvfs: {:?}", e); + return Err(Errno::from(libc::EINVAL)); + } + Ok(()) + }); - if let Err(e) = handle.await { - error!("Failed to mount gvfs: {:?}", e); - return Err(Errno::from(libc::EINVAL)); + tokio::select! { + _ = handle => {} + _ = signal::ctrl_c() => { + info!("Received Ctrl+C, unmounting gvfs..."); + } } let _ = gvfs_unmount().await; diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs b/clients/filesystem-fuse/src/memory_filesystem.rs index b94d16b8d39..f56e65ea33a 100644 --- a/clients/filesystem-fuse/src/memory_filesystem.rs +++ b/clients/filesystem-fuse/src/memory_filesystem.rs @@ -42,8 +42,6 @@ pub struct MemoryFileSystem { } impl MemoryFileSystem { - const FS_META_FILE_NAME: &'static str = "/.gvfs_meta"; - pub(crate) async fn new() -> Self { Self { file_map: RwLock::new(Default::default()), @@ -69,16 +67,6 @@ impl PathFileSystem for MemoryFileSystem { }; let root_path = PathBuf::from("/"); self.file_map.write().unwrap().insert(root_path, root_file); - - let meta_file = MemoryFile { - kind: RegularFile, - data: Arc::new(Mutex::new(Vec::new())), - }; - let meta_file_path = Path::new(Self::FS_META_FILE_NAME).to_path_buf(); - self.file_map - .write() - .unwrap() - .insert(meta_file_path, meta_file); Ok(()) } @@ -248,7 +236,10 @@ fn path_in_dir(dir: &Path, path: &Path) -> bool { #[cfg(test)] mod tests { use super::*; - use crate::filesystem::tests::TestPathFileSystem; + use crate::config::AppConfig; + use crate::default_raw_filesystem::DefaultRawFileSystem; + use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem}; + use crate::filesystem::{FileSystemContext, RawFileSystem}; #[test] fn test_path_in_dir() { @@ -281,7 +272,20 @@ mod tests { async fn test_memory_file_system() { let fs = MemoryFileSystem::new().await; let _ = fs.init().await; - let mut tester = TestPathFileSystem::new(fs); + let mut tester = TestPathFileSystem::new(Path::new("/ab"), fs); tester.test_path_file_system().await; } + + #[tokio::test] + async fn test_memory_file_system_with_raw_file_system() { + let memory_fs = MemoryFileSystem::new().await; + let raw_fs = DefaultRawFileSystem::new( + memory_fs, + &AppConfig::default(), + &FileSystemContext::default(), + ); + let _ = raw_fs.init().await; + let mut tester = TestRawFileSystem::new(Path::new("/ab"), raw_fs); + tester.test_raw_file_system().await; + } } diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs b/clients/filesystem-fuse/src/open_dal_filesystem.rs new file mode 100644 index 00000000000..e53fbaf6032 --- /dev/null +++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::AppConfig; +use crate::filesystem::{ + FileReader, FileStat, FileSystemCapacity, FileSystemContext, FileWriter, PathFileSystem, Result, +}; +use crate::opened_file::{OpenFileFlags, OpenedFile}; +use async_trait::async_trait; +use bytes::Bytes; +use fuse3::FileType::{Directory, RegularFile}; +use fuse3::{Errno, FileType, Timestamp}; +use log::error; +use opendal::{EntryMode, ErrorKind, Metadata, Operator}; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +pub(crate) struct OpenDalFileSystem { + op: Operator, +} + +impl OpenDalFileSystem {} + +impl OpenDalFileSystem { + pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context: &FileSystemContext) -> Self { + Self { op: op } + } + + fn opendal_meta_to_file_stat(&self, meta: &Metadata, file_stat: &mut FileStat) { + let now = SystemTime::now(); + let mtime = meta.last_modified().map(|x| x.into()).unwrap_or(now); + + file_stat.size = meta.content_length(); + file_stat.kind = opendal_filemode_to_filetype(meta.mode()); + file_stat.ctime = Timestamp::from(mtime); + file_stat.atime = Timestamp::from(now); + file_stat.mtime = Timestamp::from(mtime); + } +} + +#[async_trait] +impl PathFileSystem for OpenDalFileSystem { + async fn init(&self) -> Result<()> { + Ok(()) + } + + async fn stat(&self, path: &Path) -> Result { + let file_name = path.to_string_lossy().to_string(); + let meta_result = self.op.stat(&file_name).await; + + // path may be a directory, so try to stat it as a directory + let meta = match meta_result { + Ok(meta) => meta, + Err(err) => { + if err.kind() == ErrorKind::NotFound { + let dir_name = build_dir_path(path); + self.op + .stat(&dir_name) + .await + .map_err(opendal_error_to_errno)? + } else { + return Err(opendal_error_to_errno(err)); + } + } + }; + + let mut file_stat = FileStat::new_file_filestat_with_path(path, 0); + self.opendal_meta_to_file_stat(&meta, &mut file_stat); + + Ok(file_stat) + } + + async fn read_dir(&self, path: &Path) -> Result> { + // dir name should end with '/' in opendal. + let dir_name = build_dir_path(path); + let entries = self + .op + .list(&dir_name) + .await + .map_err(opendal_error_to_errno)?; + entries + .iter() + .map(|entry| { + let mut path = PathBuf::from(path); + path.push(entry.name()); + + let mut file_stat = FileStat::new_file_filestat_with_path(&path, 0); + self.opendal_meta_to_file_stat(entry.metadata(), &mut file_stat); + Ok(file_stat) + }) + .collect() + } + + async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + let file_stat = self.stat(path).await?; + debug_assert!(file_stat.kind == RegularFile); + + let mut file = OpenedFile::new(file_stat); + let file_name = path.to_string_lossy().to_string(); + if flags.is_read() { + let reader = self + .op + .reader_with(&file_name) + .await + .map_err(opendal_error_to_errno)?; + file.reader = Some(Box::new(FileReaderImpl { reader })); + } + if flags.is_write() || flags.is_create() || flags.is_append() || flags.is_truncate() { + let writer = self + .op + .writer_with(&file_name) + .await + .map_err(opendal_error_to_errno)?; + file.writer = Some(Box::new(FileWriterImpl { writer })); + } + Ok(file) + } + + async fn open_dir(&self, path: &Path, _flags: OpenFileFlags) -> Result { + let file_stat = self.stat(path).await?; + debug_assert!(file_stat.kind == Directory); + + let opened_file = OpenedFile::new(file_stat); + Ok(opened_file) + } + + async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + let file_name = path.to_string_lossy().to_string(); + + let mut writer = self + .op + .writer_with(&file_name) + .await + .map_err(opendal_error_to_errno)?; + + writer.close().await.map_err(opendal_error_to_errno)?; + + let file = self.open_file(path, flags).await?; + Ok(file) + } + + async fn create_dir(&self, path: &Path) -> Result { + let dir_name = build_dir_path(path); + self.op + .create_dir(&dir_name) + .await + .map_err(opendal_error_to_errno)?; + let file_stat = self.stat(path).await?; + Ok(file_stat) + } + + async fn set_attr(&self, _path: &Path, _file_stat: &FileStat, _flush: bool) -> Result<()> { + // no need to implement + Ok(()) + } + + async fn remove_file(&self, path: &Path) -> Result<()> { + let file_name = path.to_string_lossy().to_string(); + self.op + .remove(vec![file_name]) + .await + .map_err(opendal_error_to_errno) + } + + async fn remove_dir(&self, path: &Path) -> Result<()> { + //todo:: need to consider keeping the behavior of posix remove dir when the dir is not empty + let dir_name = build_dir_path(path); + self.op + .remove(vec![dir_name]) + .await + .map_err(opendal_error_to_errno) + } + + fn get_capacity(&self) -> Result { + Ok(FileSystemCapacity {}) + } +} + +struct FileReaderImpl { + reader: opendal::Reader, +} + +#[async_trait] +impl FileReader for FileReaderImpl { + async fn read(&mut self, offset: u64, size: u32) -> Result { + let end = offset + size as u64; + let v = self + .reader + .read(offset..end) + .await + .map_err(opendal_error_to_errno)?; + Ok(v.to_bytes()) + } +} + +struct FileWriterImpl { + writer: opendal::Writer, +} + +#[async_trait] +impl FileWriter for FileWriterImpl { + async fn write(&mut self, _offset: u64, data: &[u8]) -> Result { + self.writer + .write(data.to_vec()) + .await + .map_err(opendal_error_to_errno)?; + Ok(data.len() as u32) + } + + async fn close(&mut self) -> Result<()> { + self.writer.close().await.map_err(opendal_error_to_errno)?; + Ok(()) + } +} + +fn build_dir_path(path: &Path) -> String { + let mut dir_path = path.to_string_lossy().to_string(); + if !dir_path.ends_with('/') { + dir_path.push('/'); + } + dir_path +} + +fn opendal_error_to_errno(err: opendal::Error) -> Errno { + error!("opendal operator error {:?}", err); + match err.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::IsADirectory => Errno::from(libc::EISDIR), + ErrorKind::NotFound => Errno::from(libc::ENOENT), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + ErrorKind::AlreadyExists => Errno::from(libc::EEXIST), + ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR), + ErrorKind::RateLimited => Errno::from(libc::EBUSY), + _ => Errno::from(libc::ENOENT), + } +} + +fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType { + match mode { + EntryMode::DIR => Directory, + _ => RegularFile, + } +} + +#[cfg(test)] +mod test { + use crate::config::AppConfig; + use crate::s3_filesystem::extract_s3_config; + use opendal::layers::LoggingLayer; + use opendal::{services, Builder, Operator}; + + #[tokio::test] + async fn test_s3_stat() { + let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap(); + let opendal_config = extract_s3_config(&config); + + let builder = services::S3::from_map(opendal_config); + + // Init an operator + let op = Operator::new(builder) + .expect("opendal create failed") + .layer(LoggingLayer::default()) + .finish(); + + let path = "/"; + let list = op.list(path).await; + if let Ok(l) = list { + for i in l { + println!("list result: {:?}", i); + } + } else { + println!("list error: {:?}", list.err()); + } + + let meta = op.stat_with(path).await; + if let Ok(m) = meta { + println!("stat result: {:?}", m); + } else { + println!("stat error: {:?}", meta.err()); + } + } +} diff --git a/clients/filesystem-fuse/src/opened_file.rs b/clients/filesystem-fuse/src/opened_file.rs index 5bc961c9a6b..0c630e07217 100644 --- a/clients/filesystem-fuse/src/opened_file.rs +++ b/clients/filesystem-fuse/src/opened_file.rs @@ -122,6 +122,32 @@ pub(crate) struct FileHandle { // OpenFileFlags is the open file flags for the file system. pub(crate) struct OpenFileFlags(pub(crate) u32); +impl OpenFileFlags { + pub fn is_read(&self) -> bool { + (self.0 & libc::O_WRONLY as u32) == 0 + } + + pub fn is_write(&self) -> bool { + (self.0 & libc::O_WRONLY as u32) != 0 || (self.0 & libc::O_RDWR as u32) != 0 + } + + pub fn is_append(&self) -> bool { + (self.0 & libc::O_APPEND as u32) != 0 + } + + pub fn is_create(&self) -> bool { + (self.0 & libc::O_CREAT as u32) != 0 + } + + pub fn is_truncate(&self) -> bool { + (self.0 & libc::O_TRUNC as u32) != 0 + } + + pub fn is_exclusive(&self) -> bool { + (self.0 & libc::O_EXCL as u32) != 0 + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/clients/filesystem-fuse/src/s3_filesystem.rs b/clients/filesystem-fuse/src/s3_filesystem.rs new file mode 100644 index 00000000000..e0ca69b4ccf --- /dev/null +++ b/clients/filesystem-fuse/src/s3_filesystem.rs @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::AppConfig; +use crate::error::ErrorCode::{InvalidConfig, OpenDalError}; +use crate::filesystem::{FileStat, FileSystemCapacity, FileSystemContext, PathFileSystem, Result}; +use crate::gravitino_client::{Catalog, Fileset}; +use crate::open_dal_filesystem::OpenDalFileSystem; +use crate::opened_file::{OpenFileFlags, OpenedFile}; +use crate::utils::{parse_location, GvfsResult}; +use async_trait::async_trait; +use log::error; +use opendal::layers::LoggingLayer; +use opendal::services::S3; +use opendal::{Builder, Operator}; +use std::collections::HashMap; +use std::path::Path; + +pub(crate) struct S3FileSystem { + open_dal_fs: OpenDalFileSystem, +} + +impl S3FileSystem {} + +impl S3FileSystem { + const S3_CONFIG_PREFIX: &'static str = "s3-"; + + pub(crate) fn new( + catalog: &Catalog, + fileset: &Fileset, + config: &AppConfig, + _fs_context: &FileSystemContext, + ) -> GvfsResult { + let mut opendal_config = extract_s3_config(config); + let bucket = extract_bucket(&fileset.storage_location)?; + opendal_config.insert("bucket".to_string(), bucket); + + let region = Self::get_s3_region(catalog)?; + opendal_config.insert("region".to_string(), region); + + let builder = S3::from_map(opendal_config); + + let op = Operator::new(builder); + if let Err(e) = op { + error!("opendal create failed: {:?}", e); + return Err(OpenDalError.to_error(format!("opendal create failed: {:?}", e))); + } + let op = op.unwrap().layer(LoggingLayer::default()).finish(); + let open_dal_fs = OpenDalFileSystem::new(op, config, _fs_context); + Ok(Self { + open_dal_fs: open_dal_fs, + }) + } + + fn get_s3_region(catalog: &Catalog) -> GvfsResult { + if let Some(region) = catalog.properties.get("s3-region") { + Ok(region.clone()) + } else if let Some(endpoint) = catalog.properties.get("s3-endpoint") { + extract_region(endpoint) + } else { + Err(InvalidConfig.to_error(format!( + "Cant not retrieve region in the Catalog {}", + catalog.name + ))) + } + } +} + +#[async_trait] +impl PathFileSystem for S3FileSystem { + async fn init(&self) -> Result<()> { + Ok(()) + } + + async fn stat(&self, path: &Path) -> Result { + self.open_dal_fs.stat(path).await + } + + async fn read_dir(&self, path: &Path) -> Result> { + self.open_dal_fs.read_dir(path).await + } + + async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + self.open_dal_fs.open_file(path, flags).await + } + + async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> Result { + self.open_dal_fs.open_dir(path, flags).await + } + + async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + self.open_dal_fs.create_file(path, flags).await + } + + async fn create_dir(&self, path: &Path) -> Result { + self.open_dal_fs.create_dir(path).await + } + + async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()> { + self.open_dal_fs.set_attr(path, file_stat, flush).await + } + + async fn remove_file(&self, path: &Path) -> Result<()> { + self.open_dal_fs.remove_file(path).await + } + + async fn remove_dir(&self, path: &Path) -> Result<()> { + self.open_dal_fs.remove_dir(path).await + } + + fn get_capacity(&self) -> Result { + self.open_dal_fs.get_capacity() + } +} + +pub(crate) fn extract_bucket(location: &str) -> GvfsResult { + let url = parse_location(location)?; + match url.host_str() { + Some(host) => Ok(host.to_string()), + None => Err(InvalidConfig.to_error(format!( + "Invalid fileset location without bucket: {}", + location + ))), + } +} + +pub(crate) fn extract_region(location: &str) -> GvfsResult { + let url = parse_location(location)?; + match url.host_str() { + Some(host) => { + let parts: Vec<&str> = host.split('.').collect(); + if parts.len() > 1 { + Ok(parts[1].to_string()) + } else { + Err(InvalidConfig.to_error(format!( + "Invalid location: expected region in host, got {}", + location + ))) + } + } + None => Err(InvalidConfig.to_error(format!( + "Invalid fileset location without bucket: {}", + location + ))), + } +} + +pub fn extract_s3_config(config: &AppConfig) -> HashMap { + config + .extend_config + .clone() + .into_iter() + .filter_map(|(k, v)| { + if k.starts_with(S3FileSystem::S3_CONFIG_PREFIX) { + Some(( + k.strip_prefix(S3FileSystem::S3_CONFIG_PREFIX) + .unwrap() + .to_string(), + v, + )) + } else { + None + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::default_raw_filesystem::DefaultRawFileSystem; + use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem}; + use crate::filesystem::RawFileSystem; + use opendal::layers::TimeoutLayer; + use std::time::Duration; + + #[test] + fn test_extract_bucket() { + let location = "s3://bucket/path/to/file"; + let result = extract_bucket(location); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "bucket"); + } + + #[test] + fn test_extract_region() { + let location = "http://s3.ap-southeast-2.amazonaws.com"; + let result = extract_region(location); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "ap-southeast-2"); + } + + async fn delete_dir(op: &Operator, dir_name: &str) { + let childs = op.list(dir_name).await.expect("list dir failed"); + for child in childs { + let child_name = dir_name.to_string() + child.name(); + if child.metadata().is_dir() { + Box::pin(delete_dir(op, &child_name)).await; + } else { + op.delete(&child_name).await.expect("delete file failed"); + } + } + op.delete(dir_name).await.expect("delete dir failed"); + } + + async fn create_s3_fs(cwd: &Path) -> S3FileSystem { + let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap(); + let opendal_config = extract_s3_config(&config); + + let fs_context = FileSystemContext::default(); + + let builder = S3::from_map(opendal_config); + let op = Operator::new(builder) + .expect("opendal create failed") + .layer(LoggingLayer::default()) + .layer( + TimeoutLayer::new() + .with_timeout(Duration::from_secs(300)) + .with_io_timeout(Duration::from_secs(300)), + ) + .finish(); + + // clean up the test directory + let file_name = cwd.to_string_lossy().to_string() + "/"; + delete_dir(&op, &file_name).await; + op.create_dir(&file_name) + .await + .expect("create test dir failed"); + + let open_dal_fs = OpenDalFileSystem::new(op, &config, &fs_context); + S3FileSystem { open_dal_fs } + } + + #[tokio::test] + async fn test_s3_file_system() { + if std::env::var("RUN_S3_TESTS").is_err() { + return; + } + let cwd = Path::new("/gvfs_test1"); + let fs = create_s3_fs(cwd).await; + + let _ = fs.init().await; + let mut tester = TestPathFileSystem::new(cwd, fs); + tester.test_path_file_system().await; + } + + #[tokio::test] + async fn test_s3_file_system_with_raw_file_system() { + if std::env::var("RUN_S3_TESTS").is_err() { + return; + } + + let cwd = Path::new("/gvfs_test2"); + let s3_fs = create_s3_fs(cwd).await; + let raw_fs = + DefaultRawFileSystem::new(s3_fs, &AppConfig::default(), &FileSystemContext::default()); + let _ = raw_fs.init().await; + let mut tester = TestRawFileSystem::new(cwd, raw_fs); + tester.test_raw_file_system().await; + } +} diff --git a/clients/filesystem-fuse/src/utils.rs b/clients/filesystem-fuse/src/utils.rs index bbc8d7d7f8a..53eb9179d71 100644 --- a/clients/filesystem-fuse/src/utils.rs +++ b/clients/filesystem-fuse/src/utils.rs @@ -16,9 +16,36 @@ * specific language governing permissions and limitations * under the License. */ +use crate::error::ErrorCode::InvalidConfig; use crate::error::GvfsError; +use reqwest::Url; +use std::path::PathBuf; pub type GvfsResult = Result; +pub(crate) fn parse_location(location: &str) -> GvfsResult { + let parsed_url = Url::parse(location); + if let Err(e) = parsed_url { + return Err(InvalidConfig.to_error(format!("Invalid fileset location: {}", e))); + } + Ok(parsed_url.unwrap()) +} + +pub(crate) fn extract_root_path(location: &str) -> GvfsResult { + let url = parse_location(location)?; + Ok(PathBuf::from(url.path())) +} + #[cfg(test)] -mod tests {} +mod tests { + use crate::utils::extract_root_path; + use std::path::PathBuf; + + #[test] + fn test_extract_root_path() { + let location = "s3://bucket/path/to/file"; + let result = extract_root_path(location); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), PathBuf::from("/path/to/file")); + } +} diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml b/clients/filesystem-fuse/tests/conf/config_test.toml similarity index 91% rename from clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml rename to clients/filesystem-fuse/tests/conf/config_test.toml index ff7c6936f37..524e0aa94fb 100644 --- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml +++ b/clients/filesystem-fuse/tests/conf/config_test.toml @@ -34,7 +34,7 @@ block_size = 8192 uri = "http://localhost:8090" metalake = "test" -# extent settings +# extend settings [extend_config] -access_key = "XXX_access_key" -secret_key = "XXX_secret_key" +s3-access_key_id = "XXX_access_key" +s3-secret_access_key = "XXX_secret_key" diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml index 013df6cfc31..0ec447cd087 100644 --- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml +++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml @@ -34,7 +34,7 @@ block_size = 8192 uri = "http://localhost:8090" metalake = "test" -# extent settings -[extent_config] -access_key = "XXX_access_key" -secret_key = "XXX_secret_key" +# extend settings +[extend_config] +s3-access_key_id = "XXX_access_key" +s3-secret_access_key = "XXX_secret_key" diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml b/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml new file mode 100644 index 00000000000..7d182cd40df --- /dev/null +++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# fuse settings +[fuse] +file_mask= 0o600 +dir_mask= 0o700 +fs_type = "memory" + +[fuse.properties] +key1 = "value1" +key2 = "value2" + +# filesystem settings +[filesystem] +block_size = 8192 + +# Gravitino settings +[gravitino] +uri = "http://localhost:8090" +metalake = "test" + +# extend settings +[extend_config] +s3-access_key_id = "XXX_access_key" +s3-secret_access_key = "XXX_secret_key" +s3-region = "XXX_region" +s3-bucket = "XXX_bucket" + diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs index e761fabc5b6..d06199d782e 100644 --- a/clients/filesystem-fuse/tests/fuse_test.rs +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -17,15 +17,16 @@ * under the License. */ +use fuse3::Errno; use gvfs_fuse::config::AppConfig; use gvfs_fuse::{gvfs_mount, gvfs_unmount}; -use log::info; -use std::fs; +use log::{error, info}; use std::fs::File; use std::path::Path; use std::sync::Arc; use std::thread::sleep; use std::time::{Duration, Instant}; +use std::{fs, panic, process}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; @@ -42,8 +43,14 @@ impl FuseTest { let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml")) .expect("Failed to load config"); - self.runtime - .spawn(async move { gvfs_mount(&mount_point, "", &config).await }); + self.runtime.spawn(async move { + let result = gvfs_mount(&mount_point, "", &config).await; + if let Err(e) = result { + error!("Failed to mount gvfs: {:?}", e); + return Err(Errno::from(libc::EINVAL)); + } + Ok(()) + }); let success = Self::wait_for_fuse_server_ready(&self.mount_point, Duration::from_secs(15)); assert!(success, "Fuse server cannot start up at 15 seconds"); } @@ -60,6 +67,7 @@ impl FuseTest { while start_time.elapsed() < timeout { if file_exists(&test_file) { + info!("Fuse server is ready",); return true; } info!("Wait for fuse server ready",); @@ -80,6 +88,11 @@ impl Drop for FuseTest { fn test_fuse_system_with_auto() { tracing_subscriber::fmt().init(); + panic::set_hook(Box::new(|info| { + error!("A panic occurred: {:?}", info); + process::exit(1); + })); + let mount_point = "target/gvfs"; let _ = fs::create_dir_all(mount_point);